BosonAI_Hackathon / tools /step000_video_downloader.py
github-actions[bot]
Deploy snapshot for HF Space (LFS pointers, heavy tests removed)
09eaf7c
# -*- coding: utf-8 -*-
import os
import re
import json
from datetime import datetime
from typing import Iterable, Dict, Any, List, Tuple, Generator, Optional
from loguru import logger
import yt_dlp
# -------------------------
# Helpers
# -------------------------
def sanitize_title(title: str) -> str:
"""
Keep letters, digits, underscore, hyphen, Chinese/Japanese/Korean chars, and spaces.
Collapse consecutive spaces and trim ends.
"""
if not isinstance(title, str):
title = str(title) if title is not None else "Unknown"
# allow: word chars, CJK (CN/JP/KR), spaces, hyphen, underscore, digits
title = re.sub(
r"[^\w\u4e00-\u9fff\u3040-\u30ff\u31f0-\u31ff\uac00-\ud7af \-]",
"",
title,
)
title = re.sub(r"\s+", " ", title).strip()
return title or "Untitled"
def _ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
def _derive_upload_date(info: Dict[str, Any]) -> str:
"""
Prefer 'upload_date' (YYYYMMDD). If missing, try 'timestamp'/'release_timestamp'.
Fallback to '00000000' to keep pipeline moving.
"""
ud = (info.get("upload_date") or "").strip()
if re.fullmatch(r"\d{8}", ud):
return ud
for key in ("timestamp", "release_timestamp", "epoch"):
ts = info.get(key)
if isinstance(ts, (int, float)) and ts > 0:
try:
return datetime.utcfromtimestamp(int(ts)).strftime("%Y%m%d")
except Exception:
pass
# last resort
return "00000000"
def get_target_folder(info: Dict[str, Any], folder_path: str) -> str:
"""
Build the output folder as <folder_path>/<uploader>/<upload_date> <title>.
Never returns None (falls back to '00000000' date).
"""
sanitized_title = sanitize_title(info.get("title", ""))
sanitized_uploader = sanitize_title(info.get("uploader", "Unknown"))
upload_date = _derive_upload_date(info)
output_folder = os.path.join(folder_path, sanitized_uploader, f"{upload_date} {sanitized_title}")
return output_folder
# -------------------------
# Download
# -------------------------
def download_single_video(info: Dict[str, Any], folder_path: str, resolution: str = "1080p") -> Optional[str]:
"""
Download one video using yt-dlp.
Returns the output folder path (where 'download.mp4' will be saved),
or None if no webpage_url is available.
"""
webpage_url = info.get("webpage_url")
if not webpage_url:
logger.error("Missing 'webpage_url' in info; skipping.")
return None
output_folder = get_target_folder(info, folder_path)
_ensure_dir(output_folder)
# If already downloaded, short-circuit
target_mp4 = os.path.join(output_folder, "download.mp4")
if os.path.exists(target_mp4):
logger.info(f"Video already downloaded in {output_folder}")
return output_folder
# Parse "1080p" -> "1080"
res_num = (resolution or "").lower().replace("p", "")
try:
height_limit = int(res_num)
except ValueError:
height_limit = 1080
# Cookie handling: use cookies.txt if present; otherwise omit the key
cookiefile = "cookies.txt"
use_cookiefile = os.path.exists(cookiefile)
ydl_opts = {
# Prefer mp4 final container, fallback chain keeps things robust
"format": f"bestvideo[ext=mp4][height<={height_limit}]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"merge_output_format": "mp4",
"writeinfojson": True,
"writethumbnail": True,
# ensure the final filename becomes download.mp4
"outtmpl": os.path.join(output_folder, "download.%(ext)s"),
"ignoreerrors": True,
}
if use_cookiefile:
ydl_opts["cookiefile"] = cookiefile
# If you prefer using browser cookies instead, uncomment one:
# ydl_opts["cookiesfrombrowser"] = ("chrome",) # Chrome
# ydl_opts["cookiesfrombrowser"] = ("firefox", "default") # Firefox default profile
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([webpage_url])
# Some sites may deliver non-mp4 despite merge preference; normalize if needed
produced = None
for fn in os.listdir(output_folder):
if fn.startswith("download.") and fn.split(".")[-1].lower() in ("mp4", "m4v", "mov", "webm", "mkv"):
produced = os.path.join(output_folder, fn)
break
if produced and produced != target_mp4:
# try to remux to mp4 via ffmpeg only when needed
if produced.lower().endswith(".mp4"):
os.replace(produced, target_mp4)
else:
os.system(
f'ffmpeg -loglevel error -y -i "{produced}" -c copy "{target_mp4}"'
)
try:
if os.path.exists(target_mp4):
os.remove(produced)
except Exception:
pass
if os.path.exists(target_mp4):
logger.info(f"Video downloaded in {output_folder}")
return output_folder
logger.warning(f"Download finished but {target_mp4} not found in {output_folder}")
return output_folder # still return the folder so later steps can decide
def download_videos(info_list: Iterable[Dict[str, Any]], folder_path: str, resolution: str = "1080p") -> Optional[str]:
"""
Download multiple videos; returns the last video's output folder (unchanged behavior).
"""
last_folder = None
for info in info_list:
if not info:
continue
out = download_single_video(info, folder_path, resolution)
if out:
last_folder = out
return last_folder
# -------------------------
# Info fetching
# -------------------------
def get_info_list_from_url(url: Iterable[str] | str, num_videos: int) -> Generator[Dict[str, Any], None, None]:
"""
Yield video info dicts from one or more URLs (playlist or single video).
"""
urls = [url] if isinstance(url, str) else list(url)
ydl_opts = {
"playlistend": num_videos,
"ignoreerrors": True,
"extract_flat": False, # ensure rich entries (with webpage_url, formats, etc.)
"retries": 3,
"socket_timeout": 20,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
for u in urls:
try:
result = ydl.extract_info(u, download=False)
except Exception as e:
logger.error(f"Failed to extract info for {u}: {e}")
continue
if result is None:
continue
entries = result.get("entries")
if entries is not None:
# Playlist: entries may contain None for removed/private videos
for video_info in entries:
if video_info:
yield video_info
else:
# Single video
yield result
# -------------------------
# High-level convenience
# -------------------------
def download_from_url(url: Iterable[str] | str,
folder_path: str,
resolution: str = "1080p",
num_videos: int = 5) -> Tuple[str, str | None, dict | None]:
"""
Download up to num_videos from given URL(s). Returns:
(message, example_mp4_path, example_info_json)
The example_* values come from the last downloaded video (kept for compatibility).
"""
# Gather metadata first
video_info_list: List[Dict[str, Any]] = list(get_info_list_from_url(url, num_videos))
if not video_info_list:
return "No videos found for the provided URL(s).", None, None
# Download videos
example_output_folder = download_videos(video_info_list, folder_path, resolution)
# Try to read the last video's info JSON (if present)
download_info_json = None
example_info_path = os.path.join(example_output_folder, "download.info.json") if example_output_folder else None
if example_info_path and os.path.exists(example_info_path):
try:
with open(example_info_path, "r", encoding="utf-8") as f:
download_info_json = json.load(f)
except Exception as e:
logger.warning(f"Failed to read info JSON: {e}")
example_mp4 = os.path.join(example_output_folder, "download.mp4") if example_output_folder else None
return f"All videos have been downloaded under the '{folder_path}' folder", example_mp4, download_info_json
# -------------------------
# Manual test
# -------------------------
if __name__ == "__main__":
# Example usage
url = 'https://www.youtube.com/watch?v=5aYwU4nj5QA'
folder_path = "videos"
os.makedirs(folder_path, exist_ok=True)
msg, mp4_path, info_json = download_from_url(url, folder_path)
logger.info(msg)
if mp4_path:
logger.info(f"Example MP4: {mp4_path}")