Embedding / App /Youtube /youtubeRouter.py
Mbonea's picture
pydantic
f74ada5
from fastapi import APIRouter, HTTPException, BackgroundTasks
import tempfile
import os
import logging
import asyncio
from .Schema import YouTubeUploadTask
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/youtube", tags=["youtube"])
async def download_file(url: str) -> str:
"""Download file from URL using aria2c."""
try:
# Create temp directory for download
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(
temp_dir, "video"
) # aria2c will add extension automatically
# Build aria2c command
command = [
"aria2c",
"--allow-overwrite=true",
"--auto-file-renaming=false",
"--max-connection-per-server=16",
"--split=16",
"--dir",
temp_dir,
"--out",
"video",
url,
]
logger.info(f"Starting download with aria2c: {url}")
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"Download failed: {stderr.decode()}")
# Find the downloaded file (aria2c adds extension automatically)
downloaded_files = os.listdir(temp_dir)
if not downloaded_files:
raise Exception("No file downloaded")
downloaded_file = os.path.join(temp_dir, downloaded_files[0])
logger.info(f"Download completed: {downloaded_file}")
return downloaded_file
except Exception as e:
logger.error(f"Error downloading file: {str(e)}")
raise HTTPException(status_code=500, detail=f"File download failed: {str(e)}")
async def upload_video_background(task: YouTubeUploadTask):
"""Background task to handle video upload."""
temp_dir = None
try:
# Convert HttpUrl to string
url = str(task.filename)
logger.info(f"Starting download for video: {url}")
downloaded_file = await download_file(url)
temp_dir = os.path.dirname(downloaded_file)
logger.info(f"Download complete. Saved to: {downloaded_file}")
# Build the command
command = [
"/srv/youtube/youtubeuploader",
"-filename",
downloaded_file,
"-title",
task.title,
"-description",
task.description,
"-categoryId",
task.category_id,
"-privacy",
task.privacy,
"-tags",
task.tags,
]
if task.thumbnail:
command.extend(["-thumbnail", task.thumbnail])
logger.info("Executing upload command")
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
logger.error(f"Upload failed: {stderr.decode()}")
raise Exception(f"Upload failed: {stderr.decode()}")
logger.info("Upload completed successfully")
logger.debug(f"Upload output: {stdout.decode()}")
except Exception as e:
logger.error(f"Error in upload process: {str(e)}")
raise
finally:
# Clean up temp directory and its contents
if temp_dir and os.path.exists(temp_dir):
try:
for file in os.listdir(temp_dir):
os.remove(os.path.join(temp_dir, file))
os.rmdir(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.error(f"Failed to clean up temp directory: {str(e)}")
@router.post("/upload")
async def upload_video_to_youtube(
task: YouTubeUploadTask, background_tasks: BackgroundTasks
):
"""
Endpoint to handle YouTube video upload requests.
The actual upload is performed as a background task.
"""
try:
background_tasks.add_task(upload_video_background, task)
return {"message": "Upload task started", "status": "processing"}
except Exception as e:
logger.error(f"Error initiating upload task: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))