|
|
import os |
|
|
import shutil |
|
|
import zipfile |
|
|
import tempfile |
|
|
import gradio as gr |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
from pytubefix import YouTube |
|
|
import logging |
|
|
from utils import is_image_file, is_video_file, add_prefix_to_caption |
|
|
from image_preprocessing import normalize_image |
|
|
|
|
|
from config import NORMALIZE_IMAGES_TO, TRAINING_VIDEOS_PATH, VIDEOS_TO_SPLIT_PATH, TRAINING_PATH, DEFAULT_PROMPT_PREFIX |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ImportService: |
|
|
def process_uploaded_files(self, file_paths: List[str]) -> str: |
|
|
"""Process uploaded file (ZIP, MP4, or image) |
|
|
|
|
|
Args: |
|
|
file_paths: File paths to the ploaded files from Gradio |
|
|
|
|
|
Returns: |
|
|
Status message string |
|
|
""" |
|
|
for file_path in file_paths: |
|
|
file_path = Path(file_path) |
|
|
try: |
|
|
original_name = file_path.name |
|
|
print("original_name = ", original_name) |
|
|
|
|
|
|
|
|
file_ext = file_path.suffix.lower() |
|
|
|
|
|
if file_ext == '.zip': |
|
|
return self.process_zip_file(file_path) |
|
|
elif file_ext == '.mp4' or file_ext == '.webm': |
|
|
return self.process_mp4_file(file_path, original_name) |
|
|
elif is_image_file(file_path): |
|
|
return self.process_image_file(file_path, original_name) |
|
|
else: |
|
|
raise gr.Error(f"Unsupported file type: {file_ext}") |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error processing file: {str(e)}") |
|
|
|
|
|
def process_image_file(self, file_path: Path, original_name: str) -> str: |
|
|
"""Process a single image file |
|
|
|
|
|
Args: |
|
|
file_path: Path to the image |
|
|
original_name: Original filename |
|
|
|
|
|
Returns: |
|
|
Status message string |
|
|
""" |
|
|
try: |
|
|
|
|
|
stem = Path(original_name).stem |
|
|
target_path = STAGING_PATH / f"{stem}.{NORMALIZE_IMAGES_TO}" |
|
|
|
|
|
|
|
|
counter = 1 |
|
|
while target_path.exists(): |
|
|
target_path = STAGING_PATH / f"{stem}___{counter}.{NORMALIZE_IMAGES_TO}" |
|
|
counter += 1 |
|
|
|
|
|
|
|
|
success = normalize_image(file_path, target_path) |
|
|
|
|
|
if not success: |
|
|
raise gr.Error(f"Failed to process image: {original_name}") |
|
|
|
|
|
|
|
|
src_caption_path = file_path.with_suffix('.txt') |
|
|
if src_caption_path.exists(): |
|
|
caption = src_caption_path.read_text() |
|
|
caption = add_prefix_to_caption(caption, DEFAULT_PROMPT_PREFIX) |
|
|
target_path.with_suffix('.txt').write_text(caption) |
|
|
|
|
|
logger.info(f"Successfully stored image: {target_path.name}") |
|
|
gr.Info(f"Successfully stored image: {target_path.name}") |
|
|
return f"Successfully stored image: {target_path.name}" |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error processing image file: {str(e)}") |
|
|
|
|
|
def process_zip_file(self, file_path: Path) -> str: |
|
|
"""Process uploaded ZIP file containing media files |
|
|
|
|
|
Args: |
|
|
file_path: Path to the uploaded ZIP file |
|
|
|
|
|
Returns: |
|
|
Status message string |
|
|
""" |
|
|
try: |
|
|
video_count = 0 |
|
|
image_count = 0 |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
|
|
extract_dir = Path(temp_dir) / "extracted" |
|
|
extract_dir.mkdir() |
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(extract_dir) |
|
|
|
|
|
|
|
|
for root, _, files in os.walk(extract_dir): |
|
|
for file in files: |
|
|
if file.startswith('._'): |
|
|
continue |
|
|
|
|
|
file_path = Path(root) / file |
|
|
|
|
|
try: |
|
|
if is_video_file(file_path): |
|
|
|
|
|
target_path = VIDEOS_TO_SPLIT_PATH / file_path.name |
|
|
counter = 1 |
|
|
while target_path.exists(): |
|
|
target_path = VIDEOS_TO_SPLIT_PATH / f"{file_path.stem}___{counter}{file_path.suffix}" |
|
|
counter += 1 |
|
|
shutil.copy2(file_path, target_path) |
|
|
video_count += 1 |
|
|
|
|
|
elif is_image_file(file_path): |
|
|
|
|
|
target_path = STAGING_PATH / f"{file_path.stem}.{NORMALIZE_IMAGES_TO}" |
|
|
counter = 1 |
|
|
while target_path.exists(): |
|
|
target_path = STAGING_PATH / f"{file_path.stem}___{counter}.{NORMALIZE_IMAGES_TO}" |
|
|
counter += 1 |
|
|
if normalize_image(file_path, target_path): |
|
|
image_count += 1 |
|
|
|
|
|
|
|
|
txt_path = file_path.with_suffix('.txt') |
|
|
if txt_path.exists(): |
|
|
if is_video_file(file_path): |
|
|
shutil.copy2(txt_path, target_path.with_suffix('.txt')) |
|
|
elif is_image_file(file_path): |
|
|
shutil.copy2(txt_path, target_path.with_suffix('.txt')) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing {file_path.name}: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
parts = [] |
|
|
if video_count > 0: |
|
|
parts.append(f"{video_count} videos") |
|
|
if image_count > 0: |
|
|
parts.append(f"{image_count} images") |
|
|
|
|
|
if not parts: |
|
|
return "No supported media files found in ZIP" |
|
|
|
|
|
status = f"Successfully stored {' and '.join(parts)}" |
|
|
gr.Info(status) |
|
|
return status |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error processing ZIP: {str(e)}") |
|
|
|
|
|
def process_mp4_file(self, file_path: Path, original_name: str) -> str: |
|
|
"""Process a single video file |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file |
|
|
original_name: Original filename |
|
|
|
|
|
Returns: |
|
|
Status message string |
|
|
""" |
|
|
try: |
|
|
|
|
|
target_path = VIDEOS_TO_SPLIT_PATH / original_name |
|
|
|
|
|
|
|
|
counter = 1 |
|
|
while target_path.exists(): |
|
|
stem = Path(original_name).stem |
|
|
target_path = VIDEOS_TO_SPLIT_PATH / f"{stem}___{counter}.mp4" |
|
|
counter += 1 |
|
|
|
|
|
|
|
|
shutil.copy2(file_path, target_path) |
|
|
|
|
|
gr.Info(f"Successfully stored video: {target_path.name}") |
|
|
return f"Successfully stored video: {target_path.name}" |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error processing video file: {str(e)}") |
|
|
|
|
|
def download_youtube_video(self, url: str, progress=None) -> Dict: |
|
|
"""Download a video from YouTube |
|
|
|
|
|
Args: |
|
|
url: YouTube video URL |
|
|
progress: Optional Gradio progress indicator |
|
|
|
|
|
Returns: |
|
|
Dict with status message and error (if any) |
|
|
""" |
|
|
try: |
|
|
|
|
|
yt = YouTube(url, on_progress_callback=lambda stream, chunk, bytes_remaining: |
|
|
progress((1 - bytes_remaining / stream.filesize), desc="Downloading...") |
|
|
if progress else None) |
|
|
|
|
|
video_id = yt.video_id |
|
|
output_path = VIDEOS_TO_SPLIT_PATH / f"{video_id}.mp4" |
|
|
|
|
|
|
|
|
if progress: |
|
|
print("Getting video streams...") |
|
|
progress(0, desc="Getting video streams...") |
|
|
video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() |
|
|
|
|
|
if not video: |
|
|
print("Could not find a compatible video format") |
|
|
gr.Error("Could not find a compatible video format") |
|
|
return "Could not find a compatible video format" |
|
|
|
|
|
|
|
|
if progress: |
|
|
print("Starting YouTube video download...") |
|
|
progress(0, desc="Starting download...") |
|
|
|
|
|
video.download(output_path=str(VIDEOS_TO_SPLIT_PATH), filename=f"{video_id}.mp4") |
|
|
|
|
|
|
|
|
if progress: |
|
|
print("YouTube video download complete!") |
|
|
gr.Info("YouTube video download complete!") |
|
|
progress(1, desc="Download complete!") |
|
|
return f"Successfully downloaded video: {yt.title}" |
|
|
|
|
|
except Exception as e: |
|
|
print(e) |
|
|
gr.Error(f"Error downloading video: {str(e)}") |
|
|
return f"Error downloading video: {str(e)}" |