Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| from datetime import datetime | |
| from threading import Lock | |
| from typing import Dict, List, Optional, Callable | |
| from pathlib import Path | |
| from rich.console import Console | |
| from rich.layout import Layout | |
| from rich.panel import Panel | |
| from rich.progress import Progress, TaskID | |
| from rich.live import Live | |
| from rich.table import Table | |
| from rich.text import Text | |
| import concurrent.futures | |
| from mainLogic.big4 import Gryffindor_downloadv3 | |
| from mainLogic.big4.Gryffindor_downloadv3 import DownloadResult | |
| class DownloaderTUI: | |
| """Advanced Terminal User Interface for the Downloader using rich library""" | |
| def __init__(self, verbose: bool = False): | |
| self.verbose = verbose | |
| self.console = Console() | |
| self.lock = Lock() | |
| self.log_messages: List[Dict] = [] | |
| self.max_log_lines = 20 | |
| # Progress trackers | |
| self.audio_progress = None | |
| self.audio_task_id = None | |
| self.video_progress = None | |
| self.video_task_id = None | |
| # Stats | |
| self.audio_stats = {"total": 0, "successful": 0, "failed": []} | |
| self.video_stats = {"total": 0, "successful": 0, "failed": []} | |
| self.download_start_time = None | |
| # Layout elements | |
| self.layout = self._make_layout() | |
| self.progress_container = Progress() | |
| self.live = None | |
| def _make_layout(self) -> Layout: | |
| """Create the layout structure for the TUI""" | |
| layout = Layout(name="root") | |
| # Split the screen into top and bottom sections | |
| layout.split( | |
| Layout(name="header", size=3), | |
| Layout(name="main", ratio=1), | |
| Layout(name="footer", size=3) | |
| ) | |
| # Split the main section into status and logs | |
| layout["main"].split_row( | |
| Layout(name="status", ratio=1), | |
| Layout(name="logs", ratio=2) | |
| ) | |
| # Split the status section into progress and stats | |
| layout["status"].split( | |
| Layout(name="progress", ratio=1), | |
| Layout(name="stats", ratio=1) | |
| ) | |
| return layout | |
| def start(self): | |
| """Initialize and start the live display""" | |
| self.download_start_time = time.time() | |
| # Initialize progress tracking | |
| self.progress_container = Progress() | |
| # Create the live display | |
| self.live = Live( | |
| self._generate_layout(), | |
| refresh_per_second=4, | |
| console=self.console | |
| ) | |
| self.live.start() | |
| def stop(self): | |
| """Stop the live display""" | |
| if self.live: | |
| self.live.stop() | |
| def log(self, message: str, level: str = "INFO"): | |
| """Add a log message""" | |
| with self.lock: | |
| timestamp = datetime.utcnow().strftime("%H:%M:%S") | |
| # Set color based on level | |
| if level == "ERROR": | |
| color = "red" | |
| elif level == "WARNING": | |
| color = "yellow" | |
| elif level == "DEBUG": | |
| color = "blue" | |
| else: | |
| color = "green" | |
| self.log_messages.append({ | |
| "timestamp": timestamp, | |
| "level": level, | |
| "message": message, | |
| "color": color | |
| }) | |
| # Keep logs to the maximum number | |
| while len(self.log_messages) > self.max_log_lines: | |
| self.log_messages.pop(0) | |
| self._update_display() | |
| def _update_display(self): | |
| """Update the live display with current state""" | |
| if self.live: | |
| self.live.update(self._generate_layout()) | |
| def _generate_layout(self): | |
| """Generate the complete layout with current state""" | |
| # Header | |
| header_text = Text("Media Downloader", style="bold white on blue") | |
| elapsed = "" | |
| if self.download_start_time: | |
| elapsed_secs = int(time.time() - self.download_start_time) | |
| elapsed = f" • Elapsed: {elapsed_secs // 60}m {elapsed_secs % 60}s" | |
| header_text.append(elapsed, style="white on blue") | |
| self.layout["header"].update(Panel(header_text, border_style="blue")) | |
| # Progress section | |
| progress_panel = self._generate_progress_panel() | |
| self.layout["progress"].update(progress_panel) | |
| # Stats section | |
| stats_panel = self._generate_stats_panel() | |
| self.layout["stats"].update(stats_panel) | |
| # Logs section | |
| logs_panel = self._generate_logs_panel() | |
| self.layout["logs"].update(logs_panel) | |
| # Footer | |
| footer_text = Text("Press Ctrl+C to cancel", style="italic") | |
| self.layout["footer"].update(Panel(footer_text)) | |
| return self.layout | |
| def _generate_progress_panel(self): | |
| """Generate the progress panel with progress bars""" | |
| progress = Progress() | |
| # Re-create audio progress bar if needed | |
| if self.audio_stats["total"] > 0: | |
| audio_task = progress.add_task( | |
| "[cyan]Audio Download", | |
| total=self.audio_stats["total"], | |
| completed=self.audio_stats["successful"] + len(self.audio_stats["failed"]) | |
| ) | |
| # Re-create video progress bar if needed | |
| if self.video_stats["total"] > 0: | |
| video_task = progress.add_task( | |
| "[magenta]Video Download", | |
| total=self.video_stats["total"], | |
| completed=self.video_stats["successful"] + len(self.video_stats["failed"]) | |
| ) | |
| return Panel(progress, title="Download Progress", border_style="green") | |
| def _generate_stats_panel(self): | |
| """Generate the stats panel with download statistics""" | |
| table = Table(show_header=True, header_style="bold") | |
| table.add_column("Media") | |
| table.add_column("Total") | |
| table.add_column("Successful") | |
| table.add_column("Failed") | |
| table.add_column("Completion") | |
| # Add audio stats | |
| if self.audio_stats["total"] > 0: | |
| completion = 100 * (self.audio_stats["successful"] + len(self.audio_stats["failed"])) / self.audio_stats["total"] | |
| table.add_row( | |
| "Audio", | |
| str(self.audio_stats["total"]), | |
| str(self.audio_stats["successful"]), | |
| str(len(self.audio_stats["failed"])), | |
| f"{completion:.1f}%" | |
| ) | |
| # Add video stats | |
| if self.video_stats["total"] > 0: | |
| completion = 100 * (self.video_stats["successful"] + len(self.video_stats["failed"])) / self.video_stats["total"] | |
| table.add_row( | |
| "Video", | |
| str(self.video_stats["total"]), | |
| str(self.video_stats["successful"]), | |
| str(len(self.video_stats["failed"])), | |
| f"{completion:.1f}%" | |
| ) | |
| return Panel(table, title="Download Statistics", border_style="green") | |
| def _generate_logs_panel(self): | |
| """Generate the logs panel with colorized log messages""" | |
| log_text = Text() | |
| for log in self.log_messages: | |
| timestamp = log["timestamp"] | |
| level = log["level"] | |
| message = log["message"] | |
| color = log["color"] | |
| log_text.append(f"[{timestamp}] ", style="dim") | |
| log_text.append(f"[{level}] ", style=f"bold {color}") | |
| log_text.append(f"{message}\n", style=color if level != "INFO" else "") | |
| return Panel(log_text, title="Logs", border_style="blue") | |
| def setup_audio_progress(self, total_segments: int): | |
| """Setup the audio progress tracking""" | |
| with self.lock: | |
| self.audio_stats["total"] = total_segments | |
| self.audio_stats["successful"] = 0 | |
| self.audio_stats["failed"] = [] | |
| self._update_display() | |
| def setup_video_progress(self, total_segments: int): | |
| """Setup the video progress tracking""" | |
| with self.lock: | |
| self.video_stats["total"] = total_segments | |
| self.video_stats["successful"] = 0 | |
| self.video_stats["failed"] = [] | |
| self._update_display() | |
| def update_progress(self, media_type: str, segment_num: int, success: bool): | |
| """Update progress for a specific media type""" | |
| with self.lock: | |
| if media_type == "audio": | |
| stats = self.audio_stats | |
| else: # video | |
| stats = self.video_stats | |
| if success: | |
| stats["successful"] += 1 | |
| else: | |
| stats["failed"].append(segment_num) | |
| self._update_display() | |
| # Return progress info in the same format as before | |
| return { | |
| "type": media_type, | |
| "total": stats["total"], | |
| "current": stats["successful"] + len(stats["failed"]), | |
| "percentage": ((stats["successful"] + len(stats["failed"])) / stats["total"]) * 100 if stats["total"] > 0 else 0, | |
| "segment_num": segment_num, | |
| "success": success, | |
| "failed_segments": stats["failed"].copy(), | |
| "timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| # Modified ProgressTracker to work with the new TUI | |
| class ProgressTracker: | |
| def __init__(self, total_segments: int, media_type: str, tui: DownloaderTUI, show_tqdm: bool = False): | |
| self.total = total_segments | |
| self.current = 0 | |
| self.media_type = media_type | |
| self.lock = Lock() | |
| self.failed_segments = [] | |
| self.tui = tui | |
| # Setup progress in TUI | |
| if media_type == "audio": | |
| self.tui.setup_audio_progress(total_segments) | |
| else: | |
| self.tui.setup_video_progress(total_segments) | |
| self.tui.log(f"Starting {media_type} download: {total_segments} segments", "INFO") | |
| def update(self, segment_num: int, success: bool = True) -> Dict: | |
| with self.lock: | |
| self.current += 1 | |
| if not success: | |
| self.failed_segments.append(segment_num) | |
| # Log the update | |
| msg = f"{self.media_type} segment {segment_num} {'✓' if success else '✗'}" | |
| if not success: | |
| self.tui.log(msg, "ERROR") | |
| elif self.tui.verbose: | |
| self.tui.log(msg, "DEBUG") | |
| # Update TUI progress | |
| return self.tui.update_progress(self.media_type, segment_num, success) | |
| def close(self): | |
| self.tui.log(f"{self.media_type.capitalize()} download complete: {self.current - len(self.failed_segments)}/{self.total} segments successful", "INFO") | |
| # Example of how to modify the DownloaderV3 class to use the new TUI | |
| def update_downloader_v3_with_tui(downloader:Gryffindor_downloadv3.DownloaderV3): | |
| # Replace the TerminalOutput with our new TUI | |
| downloader.terminal = DownloaderTUI(downloader.verbose) | |
| downloader.terminal.start() | |
| # Override the original _download_media method to use our new ProgressTracker | |
| # original_download_media = downloader._download_media | |
| def _download_media_with_tui(media_data, media_type, output_dir): | |
| # Same implementation but using our TUI-enabled ProgressTracker | |
| if not media_data or "segments" not in media_data: | |
| downloader.terminal.log(f"No {media_type} data provided", "WARNING") | |
| downloader.debugger.warning(f"No {media_type} data provided") | |
| return DownloadResult(None, output_dir, 0, 0, []) | |
| total_segments = len(media_data["segments"]) | |
| init_file_path = None | |
| progress_tracker = ProgressTracker( | |
| total_segments, | |
| media_type, | |
| downloader.terminal, | |
| False # No need for tqdm progress bars | |
| ) | |
| # Rest of the original _download_media implementation... | |
| # Download init segment first | |
| if "init" in media_data: | |
| init_filename = downloader._get_file_name_from_url(media_data["init"], 0, media_type) | |
| init_file_path = output_dir / init_filename | |
| if not downloader._download_segment(media_data["init"], init_file_path): | |
| downloader.terminal.log(f"Failed to download {media_type} init segment", "ERROR") | |
| downloader.debugger.error(f"Failed to download {media_type} init segment") | |
| return DownloadResult(None, output_dir, total_segments, 0, list(range(1, total_segments + 1))) | |
| downloader.terminal.log(f"Downloaded {media_type} init segment: {init_filename}", "INFO") | |
| if downloader.verbose: | |
| downloader.debugger.info(f"Downloaded {media_type} init segment: {init_filename}") | |
| # Prepare segment download tasks | |
| download_tasks = [] | |
| for segment_num, segment_url in media_data["segments"].items(): | |
| segment_filename = downloader._get_file_name_from_url( | |
| segment_url, int(segment_num), media_type | |
| ) | |
| segment_path = output_dir / segment_filename | |
| download_tasks.append(( | |
| segment_url, | |
| segment_path, | |
| int(segment_num), | |
| progress_tracker | |
| )) | |
| successful_segments = 0 | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=downloader.max_workers) as executor: | |
| futures = [executor.submit(downloader._process_segment, task) for task in download_tasks] | |
| for future in concurrent.futures.as_completed(futures): | |
| if future.result(): | |
| successful_segments += 1 | |
| # Final status update | |
| downloader.terminal.log( | |
| f"{media_type.capitalize()} download complete: {successful_segments}/{total_segments} segments successful", | |
| "INFO" | |
| ) | |
| progress_tracker.close() | |
| return DownloadResult( | |
| init_file_path, | |
| output_dir, | |
| total_segments, | |
| successful_segments, | |
| progress_tracker.failed_segments | |
| ) | |
| # Replace the original method with our enhanced version | |
| downloader._download_media = _download_media_with_tui | |
| # Make sure to stop the TUI when done | |
| original_download_all = downloader.download_all | |
| def download_all_with_tui(urls): | |
| try: | |
| return original_download_all(urls) | |
| finally: | |
| downloader.terminal.stop() | |
| downloader.download_all = download_all_with_tui | |
| return downloader | |
| # Usage example: | |
| """ | |
| # Import the original downloader | |
| from mainLogic.downloader import DownloaderV3 | |
| # Create the downloader | |
| downloader = DownloaderV3( | |
| tmp_dir="tmp", | |
| out_dir="output", | |
| verbose=True, | |
| max_workers=8 | |
| ) | |
| # Apply our TUI enhancements | |
| downloader = update_downloader_v3_with_tui(downloader) | |
| # Use the enhanced downloader | |
| results = downloader.download_all(urls_dict) | |
| """ |