Spaces:
Running
Running
| import os | |
| import shutil | |
| import zipfile | |
| import tempfile | |
| import gradio as gr | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Tuple | |
| from pytubefix import YouTube | |
| import logging | |
| from utils import is_image_file, is_video_file, add_prefix_to_caption | |
| from image_preprocessing import normalize_image | |
| from config import NORMALIZE_IMAGES_TO, TRAINING_VIDEOS_PATH, VIDEOS_TO_SPLIT_PATH, TRAINING_PATH, DEFAULT_PROMPT_PREFIX | |
| logger = logging.getLogger(__name__) | |
| class ImportService: | |
| def process_uploaded_files(self, file_paths: List[str]) -> str: | |
| """Process uploaded file (ZIP, MP4, or image) | |
| Args: | |
| file_paths: File paths to the ploaded files from Gradio | |
| Returns: | |
| Status message string | |
| """ | |
| for file_path in file_paths: | |
| file_path = Path(file_path) | |
| try: | |
| original_name = file_path.name | |
| print("original_name = ", original_name) | |
| # Determine file type from name | |
| file_ext = file_path.suffix.lower() | |
| if file_ext == '.zip': | |
| return self.process_zip_file(file_path) | |
| elif file_ext == '.mp4' or file_ext == '.webm': | |
| return self.process_mp4_file(file_path, original_name) | |
| elif is_image_file(file_path): | |
| return self.process_image_file(file_path, original_name) | |
| else: | |
| raise gr.Error(f"Unsupported file type: {file_ext}") | |
| except Exception as e: | |
| raise gr.Error(f"Error processing file: {str(e)}") | |
| def process_image_file(self, file_path: Path, original_name: str) -> str: | |
| """Process a single image file | |
| Args: | |
| file_path: Path to the image | |
| original_name: Original filename | |
| Returns: | |
| Status message string | |
| """ | |
| try: | |
| # Create a unique filename with configured extension | |
| stem = Path(original_name).stem | |
| target_path = STAGING_PATH / f"{stem}.{NORMALIZE_IMAGES_TO}" | |
| # If file already exists, add number suffix | |
| counter = 1 | |
| while target_path.exists(): | |
| target_path = STAGING_PATH / f"{stem}___{counter}.{NORMALIZE_IMAGES_TO}" | |
| counter += 1 | |
| # Convert to normalized format and remove black bars | |
| success = normalize_image(file_path, target_path) | |
| if not success: | |
| raise gr.Error(f"Failed to process image: {original_name}") | |
| # Handle caption | |
| src_caption_path = file_path.with_suffix('.txt') | |
| if src_caption_path.exists(): | |
| caption = src_caption_path.read_text() | |
| caption = add_prefix_to_caption(caption, DEFAULT_PROMPT_PREFIX) | |
| target_path.with_suffix('.txt').write_text(caption) | |
| logger.info(f"Successfully stored image: {target_path.name}") | |
| gr.Info(f"Successfully stored image: {target_path.name}") | |
| return f"Successfully stored image: {target_path.name}" | |
| except Exception as e: | |
| raise gr.Error(f"Error processing image file: {str(e)}") | |
| def process_zip_file(self, file_path: Path) -> str: | |
| """Process uploaded ZIP file containing media files | |
| Args: | |
| file_path: Path to the uploaded ZIP file | |
| Returns: | |
| Status message string | |
| """ | |
| try: | |
| video_count = 0 | |
| image_count = 0 | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Extract ZIP | |
| extract_dir = Path(temp_dir) / "extracted" | |
| extract_dir.mkdir() | |
| with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| # Process each file | |
| for root, _, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.startswith('._'): # Skip Mac metadata | |
| continue | |
| file_path = Path(root) / file | |
| try: | |
| if is_video_file(file_path): | |
| # Copy video to videos_to_split | |
| target_path = VIDEOS_TO_SPLIT_PATH / file_path.name | |
| counter = 1 | |
| while target_path.exists(): | |
| target_path = VIDEOS_TO_SPLIT_PATH / f"{file_path.stem}___{counter}{file_path.suffix}" | |
| counter += 1 | |
| shutil.copy2(file_path, target_path) | |
| video_count += 1 | |
| elif is_image_file(file_path): | |
| # Convert image and save to staging | |
| target_path = STAGING_PATH / f"{file_path.stem}.{NORMALIZE_IMAGES_TO}" | |
| counter = 1 | |
| while target_path.exists(): | |
| target_path = STAGING_PATH / f"{file_path.stem}___{counter}.{NORMALIZE_IMAGES_TO}" | |
| counter += 1 | |
| if normalize_image(file_path, target_path): | |
| image_count += 1 | |
| # Copy associated caption file if it exists | |
| txt_path = file_path.with_suffix('.txt') | |
| if txt_path.exists(): | |
| if is_video_file(file_path): | |
| shutil.copy2(txt_path, target_path.with_suffix('.txt')) | |
| elif is_image_file(file_path): | |
| shutil.copy2(txt_path, target_path.with_suffix('.txt')) | |
| except Exception as e: | |
| logger.error(f"Error processing {file_path.name}: {str(e)}") | |
| continue | |
| # Generate status message | |
| parts = [] | |
| if video_count > 0: | |
| parts.append(f"{video_count} videos") | |
| if image_count > 0: | |
| parts.append(f"{image_count} images") | |
| if not parts: | |
| return "No supported media files found in ZIP" | |
| status = f"Successfully stored {' and '.join(parts)}" | |
| gr.Info(status) | |
| return status | |
| except Exception as e: | |
| raise gr.Error(f"Error processing ZIP: {str(e)}") | |
| def process_mp4_file(self, file_path: Path, original_name: str) -> str: | |
| """Process a single video file | |
| Args: | |
| file_path: Path to the file | |
| original_name: Original filename | |
| Returns: | |
| Status message string | |
| """ | |
| try: | |
| # Create a unique filename | |
| target_path = VIDEOS_TO_SPLIT_PATH / original_name | |
| # If file already exists, add number suffix | |
| counter = 1 | |
| while target_path.exists(): | |
| stem = Path(original_name).stem | |
| target_path = VIDEOS_TO_SPLIT_PATH / f"{stem}___{counter}.mp4" | |
| counter += 1 | |
| # Copy the file to the target location | |
| shutil.copy2(file_path, target_path) | |
| gr.Info(f"Successfully stored video: {target_path.name}") | |
| return f"Successfully stored video: {target_path.name}" | |
| except Exception as e: | |
| raise gr.Error(f"Error processing video file: {str(e)}") | |
| def download_youtube_video(self, url: str, progress=None) -> Dict: | |
| """Download a video from YouTube | |
| Args: | |
| url: YouTube video URL | |
| progress: Optional Gradio progress indicator | |
| Returns: | |
| Dict with status message and error (if any) | |
| """ | |
| try: | |
| # Extract video ID and create YouTube object | |
| yt = YouTube(url, on_progress_callback=lambda stream, chunk, bytes_remaining: | |
| progress((1 - bytes_remaining / stream.filesize), desc="Downloading...") | |
| if progress else None) | |
| video_id = yt.video_id | |
| output_path = VIDEOS_TO_SPLIT_PATH / f"{video_id}.mp4" | |
| # Download highest quality progressive MP4 | |
| if progress: | |
| print("Getting video streams...") | |
| progress(0, desc="Getting video streams...") | |
| video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
| if not video: | |
| print("Could not find a compatible video format") | |
| gr.Error("Could not find a compatible video format") | |
| return "Could not find a compatible video format" | |
| # Download the video | |
| if progress: | |
| print("Starting YouTube video download...") | |
| progress(0, desc="Starting download...") | |
| video.download(output_path=str(VIDEOS_TO_SPLIT_PATH), filename=f"{video_id}.mp4") | |
| # Update UI | |
| if progress: | |
| print("YouTube video download complete!") | |
| gr.Info("YouTube video download complete!") | |
| progress(1, desc="Download complete!") | |
| return f"Successfully downloaded video: {yt.title}" | |
| except Exception as e: | |
| print(e) | |
| gr.Error(f"Error downloading video: {str(e)}") | |
| return f"Error downloading video: {str(e)}" |