|
import os |
|
from fastapi import FastAPI, HTTPException, UploadFile |
|
from fastapi.security import APIKeyHeader |
|
from fastapi.responses import JSONResponse |
|
import requests |
|
import aiohttp |
|
import asyncio |
|
import json |
|
import tempfile |
|
from typing import List, Dict |
|
import logging |
|
import random |
|
import textract |
|
|
|
app = FastAPI() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEY_NAME = "X-API-Key" |
|
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) |
|
|
|
|
|
INVIDIOUS_INSTANCE = "https://iv.melmac.space" |
|
API_KEY = os.environ.get("API_KEY") |
|
|
|
if not API_KEY: |
|
raise ValueError("API_KEY environment variable is not set") |
|
|
|
def get_random_user_agent() -> str: |
|
user_agents = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", |
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36", |
|
] |
|
return random.choice(user_agents) |
|
|
|
async def search_and_get_videos(query: str) -> List[Dict]: |
|
url = f"{INVIDIOUS_INSTANCE}/api/v1/search?q={query}&type=video" |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers={"User-Agent": get_random_user_agent()}) as response: |
|
response.raise_for_status() |
|
search_results = await response.json() |
|
videos = [ |
|
{ |
|
"id": video.get("videoId"), |
|
"title": video.get("title"), |
|
"thumbnail": video["videoThumbnails"][0]["url"] |
|
if video.get("videoThumbnails") |
|
else "", |
|
} |
|
for video in search_results |
|
][:2] |
|
return videos |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Error performing video search: {e}") |
|
return [] |
|
|
|
async def get_youtube_audio(video_id: str, max_retries: int = 3) -> Dict: |
|
for attempt in range(max_retries): |
|
try: |
|
url = f"{INVIDIOUS_INSTANCE}/api/v1/videos/{video_id}" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
response.raise_for_status() |
|
video_data = await response.json() |
|
|
|
audio_format = next((format for format in video_data.get('adaptiveFormats', []) |
|
if format.get('type', '').startswith('audio/mp4')), None) |
|
|
|
if audio_format: |
|
audio_url = audio_format.get('url') |
|
if audio_url: |
|
try: |
|
async with session.get(audio_url) as audio_response: |
|
audio_content = await audio_response.read() |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.m4a') as temp_file: |
|
temp_file.write(audio_content) |
|
temp_file_path = temp_file.name |
|
|
|
return {'success': True, 'temp_file_path': temp_file_path} |
|
except aiohttp.ServerDisconnectedError: |
|
if attempt == max_retries - 1: |
|
logger.error(f"Max retries reached for video ID {video_id}") |
|
return {'success': False, 'error': "Max retries reached"} |
|
await asyncio.sleep(1 * (attempt + 1)) |
|
continue |
|
|
|
logger.warning(f"No suitable audio format found for video ID {video_id}") |
|
return {'success': False, 'error': "No suitable audio format found"} |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Network error fetching YouTube audio for video ID {video_id}: {e}") |
|
except json.JSONDecodeError: |
|
logger.error(f"Error decoding JSON response for video ID {video_id}") |
|
except Exception as e: |
|
logger.error(f"Unexpected error fetching YouTube audio for video ID {video_id}: {e}") |
|
if attempt == max_retries - 1: |
|
return {'success': False, 'error': str(e)} |
|
await asyncio.sleep(1 * (attempt + 1)) |
|
|
|
return {'success': False, 'error': "Failed to fetch audio after multiple attempts"} |
|
|
|
def extract_text_from_document(file: UploadFile) -> dict: |
|
try: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: |
|
content = file.file.read() |
|
temp_file.write(content) |
|
temp_file_path = temp_file.name |
|
|
|
text = textract.process(temp_file_path).decode('utf-8') |
|
|
|
os.unlink(temp_file_path) |
|
|
|
return { |
|
'success': True, |
|
'extracted_text': text |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': f"Error extracting text from document: {str(e)}" |
|
} |
|
|
|
@app.get("/search-videos/") |
|
async def search_videos(query: str, api_key: str = APIKeyHeader(name=API_KEY_NAME)): |
|
if api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
|
|
videos = await search_and_get_videos(query) |
|
if not videos: |
|
return JSONResponse( |
|
status_code=404, |
|
content={"message": "No videos found or an error occurred during the search."} |
|
) |
|
return {"videos": videos} |
|
|
|
@app.get("/get-audio/{video_id}") |
|
async def get_audio(video_id: str, api_key: str = APIKeyHeader(name=API_KEY_NAME)): |
|
if api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
|
|
result = await get_youtube_audio(video_id) |
|
if not result['success']: |
|
return JSONResponse( |
|
status_code=404, |
|
content={"message": result['error']} |
|
) |
|
return {"audio_file_path": result['temp_file_path']} |
|
|
|
@app.post("/extract-text/") |
|
async def extract_text(file: UploadFile, api_key: str = APIKeyHeader(name=API_KEY_NAME)): |
|
if api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
|
|
result = extract_text_from_document(file) |
|
if not result['success']: |
|
return JSONResponse( |
|
status_code=500, |
|
content={"message": result['error']} |
|
) |
|
return {"extracted_text": result['extracted_text']} |
|
|