Spaces:
Paused
Paused
import os | |
import time | |
from datetime import datetime | |
from threading import Lock | |
from typing import Dict, List, Optional, Callable | |
from pathlib import Path | |
from rich.console import Console | |
from rich.layout import Layout | |
from rich.panel import Panel | |
from rich.progress import Progress, TaskID | |
from rich.live import Live | |
from rich.table import Table | |
from rich.text import Text | |
import concurrent.futures | |
from mainLogic.big4 import Gryffindor_downloadv3 | |
from mainLogic.big4.Gryffindor_downloadv3 import DownloadResult | |
class DownloaderTUI: | |
"""Advanced Terminal User Interface for the Downloader using rich library""" | |
def __init__(self, verbose: bool = False): | |
self.verbose = verbose | |
self.console = Console() | |
self.lock = Lock() | |
self.log_messages: List[Dict] = [] | |
self.max_log_lines = 20 | |
# Progress trackers | |
self.audio_progress = None | |
self.audio_task_id = None | |
self.video_progress = None | |
self.video_task_id = None | |
# Stats | |
self.audio_stats = {"total": 0, "successful": 0, "failed": []} | |
self.video_stats = {"total": 0, "successful": 0, "failed": []} | |
self.download_start_time = None | |
# Layout elements | |
self.layout = self._make_layout() | |
self.progress_container = Progress() | |
self.live = None | |
def _make_layout(self) -> Layout: | |
"""Create the layout structure for the TUI""" | |
layout = Layout(name="root") | |
# Split the screen into top and bottom sections | |
layout.split( | |
Layout(name="header", size=3), | |
Layout(name="main", ratio=1), | |
Layout(name="footer", size=3) | |
) | |
# Split the main section into status and logs | |
layout["main"].split_row( | |
Layout(name="status", ratio=1), | |
Layout(name="logs", ratio=2) | |
) | |
# Split the status section into progress and stats | |
layout["status"].split( | |
Layout(name="progress", ratio=1), | |
Layout(name="stats", ratio=1) | |
) | |
return layout | |
def start(self): | |
"""Initialize and start the live display""" | |
self.download_start_time = time.time() | |
# Initialize progress tracking | |
self.progress_container = Progress() | |
# Create the live display | |
self.live = Live( | |
self._generate_layout(), | |
refresh_per_second=4, | |
console=self.console | |
) | |
self.live.start() | |
def stop(self): | |
"""Stop the live display""" | |
if self.live: | |
self.live.stop() | |
def log(self, message: str, level: str = "INFO"): | |
"""Add a log message""" | |
with self.lock: | |
timestamp = datetime.utcnow().strftime("%H:%M:%S") | |
# Set color based on level | |
if level == "ERROR": | |
color = "red" | |
elif level == "WARNING": | |
color = "yellow" | |
elif level == "DEBUG": | |
color = "blue" | |
else: | |
color = "green" | |
self.log_messages.append({ | |
"timestamp": timestamp, | |
"level": level, | |
"message": message, | |
"color": color | |
}) | |
# Keep logs to the maximum number | |
while len(self.log_messages) > self.max_log_lines: | |
self.log_messages.pop(0) | |
self._update_display() | |
def _update_display(self): | |
"""Update the live display with current state""" | |
if self.live: | |
self.live.update(self._generate_layout()) | |
def _generate_layout(self): | |
"""Generate the complete layout with current state""" | |
# Header | |
header_text = Text("Media Downloader", style="bold white on blue") | |
elapsed = "" | |
if self.download_start_time: | |
elapsed_secs = int(time.time() - self.download_start_time) | |
elapsed = f" • Elapsed: {elapsed_secs // 60}m {elapsed_secs % 60}s" | |
header_text.append(elapsed, style="white on blue") | |
self.layout["header"].update(Panel(header_text, border_style="blue")) | |
# Progress section | |
progress_panel = self._generate_progress_panel() | |
self.layout["progress"].update(progress_panel) | |
# Stats section | |
stats_panel = self._generate_stats_panel() | |
self.layout["stats"].update(stats_panel) | |
# Logs section | |
logs_panel = self._generate_logs_panel() | |
self.layout["logs"].update(logs_panel) | |
# Footer | |
footer_text = Text("Press Ctrl+C to cancel", style="italic") | |
self.layout["footer"].update(Panel(footer_text)) | |
return self.layout | |
def _generate_progress_panel(self): | |
"""Generate the progress panel with progress bars""" | |
progress = Progress() | |
# Re-create audio progress bar if needed | |
if self.audio_stats["total"] > 0: | |
audio_task = progress.add_task( | |
"[cyan]Audio Download", | |
total=self.audio_stats["total"], | |
completed=self.audio_stats["successful"] + len(self.audio_stats["failed"]) | |
) | |
# Re-create video progress bar if needed | |
if self.video_stats["total"] > 0: | |
video_task = progress.add_task( | |
"[magenta]Video Download", | |
total=self.video_stats["total"], | |
completed=self.video_stats["successful"] + len(self.video_stats["failed"]) | |
) | |
return Panel(progress, title="Download Progress", border_style="green") | |
def _generate_stats_panel(self): | |
"""Generate the stats panel with download statistics""" | |
table = Table(show_header=True, header_style="bold") | |
table.add_column("Media") | |
table.add_column("Total") | |
table.add_column("Successful") | |
table.add_column("Failed") | |
table.add_column("Completion") | |
# Add audio stats | |
if self.audio_stats["total"] > 0: | |
completion = 100 * (self.audio_stats["successful"] + len(self.audio_stats["failed"])) / self.audio_stats["total"] | |
table.add_row( | |
"Audio", | |
str(self.audio_stats["total"]), | |
str(self.audio_stats["successful"]), | |
str(len(self.audio_stats["failed"])), | |
f"{completion:.1f}%" | |
) | |
# Add video stats | |
if self.video_stats["total"] > 0: | |
completion = 100 * (self.video_stats["successful"] + len(self.video_stats["failed"])) / self.video_stats["total"] | |
table.add_row( | |
"Video", | |
str(self.video_stats["total"]), | |
str(self.video_stats["successful"]), | |
str(len(self.video_stats["failed"])), | |
f"{completion:.1f}%" | |
) | |
return Panel(table, title="Download Statistics", border_style="green") | |
def _generate_logs_panel(self): | |
"""Generate the logs panel with colorized log messages""" | |
log_text = Text() | |
for log in self.log_messages: | |
timestamp = log["timestamp"] | |
level = log["level"] | |
message = log["message"] | |
color = log["color"] | |
log_text.append(f"[{timestamp}] ", style="dim") | |
log_text.append(f"[{level}] ", style=f"bold {color}") | |
log_text.append(f"{message}\n", style=color if level != "INFO" else "") | |
return Panel(log_text, title="Logs", border_style="blue") | |
def setup_audio_progress(self, total_segments: int): | |
"""Setup the audio progress tracking""" | |
with self.lock: | |
self.audio_stats["total"] = total_segments | |
self.audio_stats["successful"] = 0 | |
self.audio_stats["failed"] = [] | |
self._update_display() | |
def setup_video_progress(self, total_segments: int): | |
"""Setup the video progress tracking""" | |
with self.lock: | |
self.video_stats["total"] = total_segments | |
self.video_stats["successful"] = 0 | |
self.video_stats["failed"] = [] | |
self._update_display() | |
def update_progress(self, media_type: str, segment_num: int, success: bool): | |
"""Update progress for a specific media type""" | |
with self.lock: | |
if media_type == "audio": | |
stats = self.audio_stats | |
else: # video | |
stats = self.video_stats | |
if success: | |
stats["successful"] += 1 | |
else: | |
stats["failed"].append(segment_num) | |
self._update_display() | |
# Return progress info in the same format as before | |
return { | |
"type": media_type, | |
"total": stats["total"], | |
"current": stats["successful"] + len(stats["failed"]), | |
"percentage": ((stats["successful"] + len(stats["failed"])) / stats["total"]) * 100 if stats["total"] > 0 else 0, | |
"segment_num": segment_num, | |
"success": success, | |
"failed_segments": stats["failed"].copy(), | |
"timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Modified ProgressTracker to work with the new TUI | |
class ProgressTracker: | |
def __init__(self, total_segments: int, media_type: str, tui: DownloaderTUI, show_tqdm: bool = False): | |
self.total = total_segments | |
self.current = 0 | |
self.media_type = media_type | |
self.lock = Lock() | |
self.failed_segments = [] | |
self.tui = tui | |
# Setup progress in TUI | |
if media_type == "audio": | |
self.tui.setup_audio_progress(total_segments) | |
else: | |
self.tui.setup_video_progress(total_segments) | |
self.tui.log(f"Starting {media_type} download: {total_segments} segments", "INFO") | |
def update(self, segment_num: int, success: bool = True) -> Dict: | |
with self.lock: | |
self.current += 1 | |
if not success: | |
self.failed_segments.append(segment_num) | |
# Log the update | |
msg = f"{self.media_type} segment {segment_num} {'✓' if success else '✗'}" | |
if not success: | |
self.tui.log(msg, "ERROR") | |
elif self.tui.verbose: | |
self.tui.log(msg, "DEBUG") | |
# Update TUI progress | |
return self.tui.update_progress(self.media_type, segment_num, success) | |
def close(self): | |
self.tui.log(f"{self.media_type.capitalize()} download complete: {self.current - len(self.failed_segments)}/{self.total} segments successful", "INFO") | |
# Example of how to modify the DownloaderV3 class to use the new TUI | |
def update_downloader_v3_with_tui(downloader:Gryffindor_downloadv3.DownloaderV3): | |
# Replace the TerminalOutput with our new TUI | |
downloader.terminal = DownloaderTUI(downloader.verbose) | |
downloader.terminal.start() | |
# Override the original _download_media method to use our new ProgressTracker | |
# original_download_media = downloader._download_media | |
def _download_media_with_tui(media_data, media_type, output_dir): | |
# Same implementation but using our TUI-enabled ProgressTracker | |
if not media_data or "segments" not in media_data: | |
downloader.terminal.log(f"No {media_type} data provided", "WARNING") | |
downloader.debugger.warning(f"No {media_type} data provided") | |
return DownloadResult(None, output_dir, 0, 0, []) | |
total_segments = len(media_data["segments"]) | |
init_file_path = None | |
progress_tracker = ProgressTracker( | |
total_segments, | |
media_type, | |
downloader.terminal, | |
False # No need for tqdm progress bars | |
) | |
# Rest of the original _download_media implementation... | |
# Download init segment first | |
if "init" in media_data: | |
init_filename = downloader._get_file_name_from_url(media_data["init"], 0, media_type) | |
init_file_path = output_dir / init_filename | |
if not downloader._download_segment(media_data["init"], init_file_path): | |
downloader.terminal.log(f"Failed to download {media_type} init segment", "ERROR") | |
downloader.debugger.error(f"Failed to download {media_type} init segment") | |
return DownloadResult(None, output_dir, total_segments, 0, list(range(1, total_segments + 1))) | |
downloader.terminal.log(f"Downloaded {media_type} init segment: {init_filename}", "INFO") | |
if downloader.verbose: | |
downloader.debugger.info(f"Downloaded {media_type} init segment: {init_filename}") | |
# Prepare segment download tasks | |
download_tasks = [] | |
for segment_num, segment_url in media_data["segments"].items(): | |
segment_filename = downloader._get_file_name_from_url( | |
segment_url, int(segment_num), media_type | |
) | |
segment_path = output_dir / segment_filename | |
download_tasks.append(( | |
segment_url, | |
segment_path, | |
int(segment_num), | |
progress_tracker | |
)) | |
successful_segments = 0 | |
with concurrent.futures.ThreadPoolExecutor(max_workers=downloader.max_workers) as executor: | |
futures = [executor.submit(downloader._process_segment, task) for task in download_tasks] | |
for future in concurrent.futures.as_completed(futures): | |
if future.result(): | |
successful_segments += 1 | |
# Final status update | |
downloader.terminal.log( | |
f"{media_type.capitalize()} download complete: {successful_segments}/{total_segments} segments successful", | |
"INFO" | |
) | |
progress_tracker.close() | |
return DownloadResult( | |
init_file_path, | |
output_dir, | |
total_segments, | |
successful_segments, | |
progress_tracker.failed_segments | |
) | |
# Replace the original method with our enhanced version | |
downloader._download_media = _download_media_with_tui | |
# Make sure to stop the TUI when done | |
original_download_all = downloader.download_all | |
def download_all_with_tui(urls): | |
try: | |
return original_download_all(urls) | |
finally: | |
downloader.terminal.stop() | |
downloader.download_all = download_all_with_tui | |
return downloader | |
# Usage example: | |
""" | |
# Import the original downloader | |
from mainLogic.downloader import DownloaderV3 | |
# Create the downloader | |
downloader = DownloaderV3( | |
tmp_dir="tmp", | |
out_dir="output", | |
verbose=True, | |
max_workers=8 | |
) | |
# Apply our TUI enhancements | |
downloader = update_downloader_v3_with_tui(downloader) | |
# Use the enhanced downloader | |
results = downloader.download_all(urls_dict) | |
""" |