pwdlv3.1 / tui.py
AkshitShubham's picture
Initial upload of PWDLV3 content
df3ad52 verified
import os
import time
from datetime import datetime
from threading import Lock
from typing import Dict, List, Optional, Callable
from pathlib import Path
from rich.console import Console
from rich.layout import Layout
from rich.panel import Panel
from rich.progress import Progress, TaskID
from rich.live import Live
from rich.table import Table
from rich.text import Text
import concurrent.futures
from mainLogic.big4 import Gryffindor_downloadv3
from mainLogic.big4.Gryffindor_downloadv3 import DownloadResult
class DownloaderTUI:
"""Advanced Terminal User Interface for the Downloader using rich library"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.console = Console()
self.lock = Lock()
self.log_messages: List[Dict] = []
self.max_log_lines = 20
# Progress trackers
self.audio_progress = None
self.audio_task_id = None
self.video_progress = None
self.video_task_id = None
# Stats
self.audio_stats = {"total": 0, "successful": 0, "failed": []}
self.video_stats = {"total": 0, "successful": 0, "failed": []}
self.download_start_time = None
# Layout elements
self.layout = self._make_layout()
self.progress_container = Progress()
self.live = None
def _make_layout(self) -> Layout:
"""Create the layout structure for the TUI"""
layout = Layout(name="root")
# Split the screen into top and bottom sections
layout.split(
Layout(name="header", size=3),
Layout(name="main", ratio=1),
Layout(name="footer", size=3)
)
# Split the main section into status and logs
layout["main"].split_row(
Layout(name="status", ratio=1),
Layout(name="logs", ratio=2)
)
# Split the status section into progress and stats
layout["status"].split(
Layout(name="progress", ratio=1),
Layout(name="stats", ratio=1)
)
return layout
def start(self):
"""Initialize and start the live display"""
self.download_start_time = time.time()
# Initialize progress tracking
self.progress_container = Progress()
# Create the live display
self.live = Live(
self._generate_layout(),
refresh_per_second=4,
console=self.console
)
self.live.start()
def stop(self):
"""Stop the live display"""
if self.live:
self.live.stop()
def log(self, message: str, level: str = "INFO"):
"""Add a log message"""
with self.lock:
timestamp = datetime.utcnow().strftime("%H:%M:%S")
# Set color based on level
if level == "ERROR":
color = "red"
elif level == "WARNING":
color = "yellow"
elif level == "DEBUG":
color = "blue"
else:
color = "green"
self.log_messages.append({
"timestamp": timestamp,
"level": level,
"message": message,
"color": color
})
# Keep logs to the maximum number
while len(self.log_messages) > self.max_log_lines:
self.log_messages.pop(0)
self._update_display()
def _update_display(self):
"""Update the live display with current state"""
if self.live:
self.live.update(self._generate_layout())
def _generate_layout(self):
"""Generate the complete layout with current state"""
# Header
header_text = Text("Media Downloader", style="bold white on blue")
elapsed = ""
if self.download_start_time:
elapsed_secs = int(time.time() - self.download_start_time)
elapsed = f" • Elapsed: {elapsed_secs // 60}m {elapsed_secs % 60}s"
header_text.append(elapsed, style="white on blue")
self.layout["header"].update(Panel(header_text, border_style="blue"))
# Progress section
progress_panel = self._generate_progress_panel()
self.layout["progress"].update(progress_panel)
# Stats section
stats_panel = self._generate_stats_panel()
self.layout["stats"].update(stats_panel)
# Logs section
logs_panel = self._generate_logs_panel()
self.layout["logs"].update(logs_panel)
# Footer
footer_text = Text("Press Ctrl+C to cancel", style="italic")
self.layout["footer"].update(Panel(footer_text))
return self.layout
def _generate_progress_panel(self):
"""Generate the progress panel with progress bars"""
progress = Progress()
# Re-create audio progress bar if needed
if self.audio_stats["total"] > 0:
audio_task = progress.add_task(
"[cyan]Audio Download",
total=self.audio_stats["total"],
completed=self.audio_stats["successful"] + len(self.audio_stats["failed"])
)
# Re-create video progress bar if needed
if self.video_stats["total"] > 0:
video_task = progress.add_task(
"[magenta]Video Download",
total=self.video_stats["total"],
completed=self.video_stats["successful"] + len(self.video_stats["failed"])
)
return Panel(progress, title="Download Progress", border_style="green")
def _generate_stats_panel(self):
"""Generate the stats panel with download statistics"""
table = Table(show_header=True, header_style="bold")
table.add_column("Media")
table.add_column("Total")
table.add_column("Successful")
table.add_column("Failed")
table.add_column("Completion")
# Add audio stats
if self.audio_stats["total"] > 0:
completion = 100 * (self.audio_stats["successful"] + len(self.audio_stats["failed"])) / self.audio_stats["total"]
table.add_row(
"Audio",
str(self.audio_stats["total"]),
str(self.audio_stats["successful"]),
str(len(self.audio_stats["failed"])),
f"{completion:.1f}%"
)
# Add video stats
if self.video_stats["total"] > 0:
completion = 100 * (self.video_stats["successful"] + len(self.video_stats["failed"])) / self.video_stats["total"]
table.add_row(
"Video",
str(self.video_stats["total"]),
str(self.video_stats["successful"]),
str(len(self.video_stats["failed"])),
f"{completion:.1f}%"
)
return Panel(table, title="Download Statistics", border_style="green")
def _generate_logs_panel(self):
"""Generate the logs panel with colorized log messages"""
log_text = Text()
for log in self.log_messages:
timestamp = log["timestamp"]
level = log["level"]
message = log["message"]
color = log["color"]
log_text.append(f"[{timestamp}] ", style="dim")
log_text.append(f"[{level}] ", style=f"bold {color}")
log_text.append(f"{message}\n", style=color if level != "INFO" else "")
return Panel(log_text, title="Logs", border_style="blue")
def setup_audio_progress(self, total_segments: int):
"""Setup the audio progress tracking"""
with self.lock:
self.audio_stats["total"] = total_segments
self.audio_stats["successful"] = 0
self.audio_stats["failed"] = []
self._update_display()
def setup_video_progress(self, total_segments: int):
"""Setup the video progress tracking"""
with self.lock:
self.video_stats["total"] = total_segments
self.video_stats["successful"] = 0
self.video_stats["failed"] = []
self._update_display()
def update_progress(self, media_type: str, segment_num: int, success: bool):
"""Update progress for a specific media type"""
with self.lock:
if media_type == "audio":
stats = self.audio_stats
else: # video
stats = self.video_stats
if success:
stats["successful"] += 1
else:
stats["failed"].append(segment_num)
self._update_display()
# Return progress info in the same format as before
return {
"type": media_type,
"total": stats["total"],
"current": stats["successful"] + len(stats["failed"]),
"percentage": ((stats["successful"] + len(stats["failed"])) / stats["total"]) * 100 if stats["total"] > 0 else 0,
"segment_num": segment_num,
"success": success,
"failed_segments": stats["failed"].copy(),
"timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}
# Modified ProgressTracker to work with the new TUI
class ProgressTracker:
def __init__(self, total_segments: int, media_type: str, tui: DownloaderTUI, show_tqdm: bool = False):
self.total = total_segments
self.current = 0
self.media_type = media_type
self.lock = Lock()
self.failed_segments = []
self.tui = tui
# Setup progress in TUI
if media_type == "audio":
self.tui.setup_audio_progress(total_segments)
else:
self.tui.setup_video_progress(total_segments)
self.tui.log(f"Starting {media_type} download: {total_segments} segments", "INFO")
def update(self, segment_num: int, success: bool = True) -> Dict:
with self.lock:
self.current += 1
if not success:
self.failed_segments.append(segment_num)
# Log the update
msg = f"{self.media_type} segment {segment_num} {'✓' if success else '✗'}"
if not success:
self.tui.log(msg, "ERROR")
elif self.tui.verbose:
self.tui.log(msg, "DEBUG")
# Update TUI progress
return self.tui.update_progress(self.media_type, segment_num, success)
def close(self):
self.tui.log(f"{self.media_type.capitalize()} download complete: {self.current - len(self.failed_segments)}/{self.total} segments successful", "INFO")
# Example of how to modify the DownloaderV3 class to use the new TUI
def update_downloader_v3_with_tui(downloader:Gryffindor_downloadv3.DownloaderV3):
# Replace the TerminalOutput with our new TUI
downloader.terminal = DownloaderTUI(downloader.verbose)
downloader.terminal.start()
# Override the original _download_media method to use our new ProgressTracker
# original_download_media = downloader._download_media
def _download_media_with_tui(media_data, media_type, output_dir):
# Same implementation but using our TUI-enabled ProgressTracker
if not media_data or "segments" not in media_data:
downloader.terminal.log(f"No {media_type} data provided", "WARNING")
downloader.debugger.warning(f"No {media_type} data provided")
return DownloadResult(None, output_dir, 0, 0, [])
total_segments = len(media_data["segments"])
init_file_path = None
progress_tracker = ProgressTracker(
total_segments,
media_type,
downloader.terminal,
False # No need for tqdm progress bars
)
# Rest of the original _download_media implementation...
# Download init segment first
if "init" in media_data:
init_filename = downloader._get_file_name_from_url(media_data["init"], 0, media_type)
init_file_path = output_dir / init_filename
if not downloader._download_segment(media_data["init"], init_file_path):
downloader.terminal.log(f"Failed to download {media_type} init segment", "ERROR")
downloader.debugger.error(f"Failed to download {media_type} init segment")
return DownloadResult(None, output_dir, total_segments, 0, list(range(1, total_segments + 1)))
downloader.terminal.log(f"Downloaded {media_type} init segment: {init_filename}", "INFO")
if downloader.verbose:
downloader.debugger.info(f"Downloaded {media_type} init segment: {init_filename}")
# Prepare segment download tasks
download_tasks = []
for segment_num, segment_url in media_data["segments"].items():
segment_filename = downloader._get_file_name_from_url(
segment_url, int(segment_num), media_type
)
segment_path = output_dir / segment_filename
download_tasks.append((
segment_url,
segment_path,
int(segment_num),
progress_tracker
))
successful_segments = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=downloader.max_workers) as executor:
futures = [executor.submit(downloader._process_segment, task) for task in download_tasks]
for future in concurrent.futures.as_completed(futures):
if future.result():
successful_segments += 1
# Final status update
downloader.terminal.log(
f"{media_type.capitalize()} download complete: {successful_segments}/{total_segments} segments successful",
"INFO"
)
progress_tracker.close()
return DownloadResult(
init_file_path,
output_dir,
total_segments,
successful_segments,
progress_tracker.failed_segments
)
# Replace the original method with our enhanced version
downloader._download_media = _download_media_with_tui
# Make sure to stop the TUI when done
original_download_all = downloader.download_all
def download_all_with_tui(urls):
try:
return original_download_all(urls)
finally:
downloader.terminal.stop()
downloader.download_all = download_all_with_tui
return downloader
# Usage example:
"""
# Import the original downloader
from mainLogic.downloader import DownloaderV3
# Create the downloader
downloader = DownloaderV3(
tmp_dir="tmp",
out_dir="output",
verbose=True,
max_workers=8
)
# Apply our TUI enhancements
downloader = update_downloader_v3_with_tui(downloader)
# Use the enhanced downloader
results = downloader.download_all(urls_dict)
"""