from fastapi import FastAPI, Request, Query, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware import yt_dlp import os import asyncio from datetime import datetime, timedelta from urllib.parse import quote import io import logging import json import time from collections import defaultdict import uvicorn import gradio as gr OUTPUT_DIR = "output" os.makedirs(OUTPUT_DIR, exist_ok=True) BANNED_IP_FILE = 'bannedip.json' MAX_REQUESTS_PER_MINUTE = 35 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) server_start_time = time.time() total_request_count = 0 request_counter = defaultdict(list) def load_banned_ips(): try: with open(BANNED_IP_FILE, 'r') as f: return set(json.load(f)) except (FileNotFoundError, json.JSONDecodeError): return set() def save_banned_ips(banned_ips): with open(BANNED_IP_FILE, 'w') as f: json.dump(list(banned_ips), f) async def delete_file_after_delay(file_path: str, delay: int = 600): await asyncio.sleep(delay) try: os.remove(file_path) logger.info(f"Deleted file {file_path} after {delay} seconds.") except FileNotFoundError: logger.warning(f"File {file_path} not found during deletion.") except Exception as e: logger.error(f"Error deleting file {file_path}: {e}", exc_info=True) app = FastAPI( title="YouTube Downloader API", description="API to download video/audio from YouTube.", version="1.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def log_requests(request: Request, call_next): global total_request_count total_request_count += 1 client_ip = request.headers.get('X-Forwarded-For', request.client.host).split(',')[0].strip() banned_ips = load_banned_ips() if client_ip in banned_ips: return JSONResponse(status_code=403, content={"message": "Blocked due to excessive requests."}) now = datetime.now() request_counter[client_ip].append(now) request_counter[client_ip] = [t for t in request_counter[client_ip] if t > now - timedelta(minutes=1)] if len(request_counter[client_ip]) > MAX_REQUESTS_PER_MINUTE: logger.warning(f"IP {client_ip} is blocked due to rate limiting.") banned_ips.add(client_ip) save_banned_ips(banned_ips) request_counter[client_ip] = [] logger.info(f"{client_ip} requested {request.method} {request.url}") response = await call_next(request) return response @app.get("/", summary="Root Endpoint") async def root(): return JSONResponse(status_code=200, content={"status": "Server Running"}) @app.get("/ytv/", summary="Download YouTube Video") async def download_video( background_tasks: BackgroundTasks, url: str = Query(...), quality: int = Query(720), mode: str = Query("url") ): if mode not in ["url", "buffer"]: return JSONResponse(status_code=400, content={"error": "Invalid mode. Use 'url' or 'buffer'."}) try: resx = f"_{quality}p" ydl_opts = { 'format': f'bestvideo[height<={quality}]+bestaudio/best', 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s' + resx + '.%(ext)s'), 'merge_output_format': 'mp4', 'cookiefile': 'yt.txt', 'postprocessors': [{ 'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4' }] } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) file_path = ydl.prepare_filename(info) if not os.path.exists(file_path): raise FileNotFoundError(f"File not found after download: {file_path}") background_tasks.add_task(delete_file_after_delay, file_path) if mode == "url": return { "status": "success", "title": info['title'], "thumb": info.get("thumbnail"), "url": f"https://lordxdd-ytdlp-py.hf.space/cdn/video/{quote(os.path.basename(file_path))}" } media_type = "video/mp4" with open(file_path, "rb") as f: return StreamingResponse( io.BytesIO(f.read()), media_type=media_type, headers={"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}"} ) except Exception as e: logger.error(f"Download error: {e}", exc_info=True) return JSONResponse(status_code=500, content={"error": str(e)}) @app.get("/cdn/video/{filename}", summary="Serve Downloaded File") async def download_file(filename: str): file_path = os.path.join(OUTPUT_DIR, filename) if os.path.exists(file_path): return FileResponse(file_path, filename=filename) return JSONResponse(status_code=404, content={"error": "File not found"}) @app.get("/yta/", summary="Download YouTube Audio (.webm)") async def download_audio( background_tasks: BackgroundTasks, url: str = Query(..., description="YouTube video URL"), mode: str = Query("url", description="Response mode: 'url' or 'buffer'") ): if mode not in ["url", "buffer"]: return JSONResponse(status_code=400, content={"error": "Invalid mode. Use 'url' or 'buffer'."}) try: ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s.%(ext)s'), 'cookiefile': 'yt.txt', } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) file_path = ydl.prepare_filename(info) if not os.path.exists(file_path): raise FileNotFoundError(f"File not found after download: {file_path}") background_tasks.add_task(delete_file_after_delay, file_path) if mode == "url": return { "status": "success", "title": info.get("title"), "thumb": info.get("thumbnail"), "url": f"https://lordxdd-ytdlp-py.hf.space/cdn/audio/{quote(os.path.basename(file_path))}" } media_type = "audio/webm" with open(file_path, "rb") as f: return StreamingResponse( io.BytesIO(f.read()), media_type=media_type, headers={"Content-Disposition": f"attachment; filename={os.path.basename(file_path)}"} ) except Exception as e: logger.error(f"Download error: {e}", exc_info=True) return JSONResponse(status_code=500, content={"error": str(e)}) @app.get("/cdn/audio/{filename}", summary="Serve Downloaded Audio File") async def serve_audio_file(filename: str): file_path = os.path.join(OUTPUT_DIR, filename) if os.path.exists(file_path): ext = filename.split(".")[-1] return FileResponse(file_path, filename=filename, media_type=f"audio/{ext}") return JSONResponse(status_code=404, content={"error": "File not found"}) def run_gradio_ui(): def download_gradio(url, resolution): try: quality = int(resolution) ydl_opts = { 'format': f'bestvideo[height<={quality}]+bestaudio/best', 'outtmpl': os.path.join(OUTPUT_DIR, '%(title)s' + f'_{quality}p' + '.%(ext)s'), 'merge_output_format': 'mp4', 'postprocessors': [{ 'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4' }] } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) file_path = ydl.prepare_filename(info) return { "status": "success", "title": info['title'], "url": f"/cdn/video/{quote(os.path.basename(file_path))}" } except Exception as e: return {"error": str(e)} interface = gr.Interface( fn=download_gradio, inputs=["text", gr.Slider(240, 1080)], outputs="json", title="YouTube Video Downloader" ) interface.launch() if __name__ == "__main__": import sys if "gradio" in sys.argv: run_gradio_ui() else: uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) import whisper from transformers import pipeline from gtts import gTTS import gradio as gr import os # Load Whisper model whisper_model = whisper.load_model("small") # Load translation pipeline (English to Hindi) translator = pipeline("translation", model="Helsinki-NLP/opus-mt-en-hi") def transcribe_translate_dub(audio_path): # Step 1: Transcribe audio using Whisper result = whisper_model.transcribe(audio_path) english_text = result["text"] # Step 2: Translate English to Hindi translated = translator(english_text)[0]["translation_text"] # Step 3: Text-to-Speech (Hindi) tts = gTTS(translated, lang='hi') tts.save("dubbed_audio.mp3") return translated, "dubbed_audio.mp3" # Gradio UI gr.Interface( fn=transcribe_translate_dub, inputs=gr.Audio(source="upload", type="filepath", label="Upload English Audio"), outputs=[ gr.Textbox(label="Hindi Translation"), gr.Audio(label="Hindi Dubbed Audio") ], title="🎙️ English to Hindi Video Dubbing AI", description="Upload English audio, get Hindi translation and dubbed voice." ).launch() from transformers import pipeline translator = pipeline("translation", model="Helsinki-NLP/opus-mt-en-hi") translated_text = translator("Hello, how are you?")[0]['translation_text']