|
import os |
|
import random |
|
from fastapi import FastAPI, HTTPException, UploadFile, Depends, Query, Request |
|
from fastapi.responses import JSONResponse |
|
from fastapi.security import APIKeyHeader |
|
import aiohttp |
|
import asyncio |
|
import json |
|
import tempfile |
|
from typing import List, Dict |
|
import logging |
|
import textract |
|
import boto3 |
|
from botocore.exceptions import NoCredentialsError |
|
from duckduckgo_search import DDGS |
|
from bs4 import BeautifulSoup |
|
import requests |
|
from innertube import InnerTube |
|
import ffmpeg |
|
|
|
app = FastAPI() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
API_KEY_NAME = "X-API-Key" |
|
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) |
|
|
|
|
|
INVIDIOUS_INSTANCES = [ |
|
"https://invidious.privacydev.net", |
|
"https://invidious.reallyaweso.me", |
|
"https://invidious.adminforge.de" |
|
] |
|
API_KEY = os.environ.get("API_KEY") |
|
|
|
|
|
S3_ACCESS_KEY_ID = os.environ.get("S3_ACCESS_KEY_ID") |
|
S3_SECRET_ACCESS_KEY = os.environ.get("S3_SECRET_ACCESS_KEY") |
|
S3_BUCKET = os.environ.get("S3_BUCKET") |
|
S3_REGION = os.environ.get("S3_REGION") |
|
|
|
if not all([API_KEY, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY, S3_BUCKET, S3_REGION]): |
|
raise ValueError("Missing required environment variables") |
|
|
|
|
|
USER_AGENTS = [ |
|
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1', |
|
'Mozilla/5.0 (iPad; CPU OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1', |
|
'Mozilla/5.0 (iPod touch; CPU iPhone OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1' |
|
] |
|
|
|
async def search_and_get_videos(query: str, num_videos: int = 2) -> List[Dict]: |
|
for instance in INVIDIOUS_INSTANCES: |
|
url = f"{instance}/api/v1/search?q={query}&type=video" |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url) as response: |
|
response.raise_for_status() |
|
search_results = await response.json() |
|
videos = [ |
|
{ |
|
"id": video.get("videoId"), |
|
"title": video.get("title"), |
|
"thumbnail": video["videoThumbnails"][0]["url"] |
|
if video.get("videoThumbnails") |
|
else "", |
|
} |
|
for video in search_results |
|
][:num_videos] |
|
return videos |
|
except aiohttp.ClientError as e: |
|
logger.error(f"Error performing video search on {instance}: {e}") |
|
logger.error("All Invidious instances failed") |
|
return [] |
|
|
|
def get_youtube_audio(video_id): |
|
try: |
|
logger.info(f"Starting YouTube audio extraction for video ID: {video_id}") |
|
|
|
|
|
client = InnerTube("IOS") |
|
logger.info("InnerTube client created") |
|
|
|
|
|
logger.info("Fetching video info") |
|
video_info = client.player(video_id) |
|
logger.info("Video info fetched successfully") |
|
|
|
|
|
if 'streamingData' not in video_info: |
|
logger.error(f"No 'streamingData' found in video info for video ID {video_id}") |
|
return {'success': False, 'error': "No streaming data found for the video"} |
|
|
|
|
|
streams = video_info["streamingData"]["adaptiveFormats"] |
|
audio_streams = [s for s in streams if s['mimeType'].startswith('audio/')] |
|
|
|
if not audio_streams: |
|
logger.warning(f"No audio streams found for video ID {video_id}") |
|
return {'success': False, 'error': "No audio streams found"} |
|
|
|
|
|
audio_stream = max(audio_streams, key=lambda x: x.get('bitrate', 0)) |
|
audio_url = audio_stream['url'] |
|
logger.info(f"Selected audio stream URL: {audio_url[:100]}...") |
|
|
|
|
|
headers = { |
|
'User-Agent': random.choice(USER_AGENTS), |
|
'Accept': '*/*', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Range': 'bytes=0-', |
|
'Connection': 'keep-alive', |
|
} |
|
|
|
|
|
logger.info("Starting audio download") |
|
response = requests.get(audio_url, headers=headers, stream=True) |
|
logger.info(f"Download response status code: {response.status_code}") |
|
|
|
if response.status_code in [200, 206]: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.webm') as temp_file: |
|
temp_file_path = temp_file.name |
|
logger.info(f"Created temporary file: {temp_file_path}") |
|
|
|
total_size = 0 |
|
for chunk in response.iter_content(chunk_size=8192): |
|
temp_file.write(chunk) |
|
total_size += len(chunk) |
|
if total_size % (1024 * 1024) == 0: |
|
logger.info(f"Downloaded {total_size / (1024 * 1024):.2f} MB") |
|
|
|
logger.info(f"Audio download completed. Total size: {total_size / (1024 * 1024):.2f} MB") |
|
|
|
|
|
mp3_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') |
|
mp3_temp_file_path = mp3_temp_file.name |
|
mp3_temp_file.close() |
|
|
|
logger.info(f"Converting audio to MP3: {mp3_temp_file_path}") |
|
|
|
try: |
|
( |
|
ffmpeg |
|
.input(temp_file_path) |
|
.output(mp3_temp_file_path, acodec='libmp3lame', audio_bitrate='128k') |
|
.overwrite_output() |
|
.run(capture_stdout=True, capture_stderr=True) |
|
) |
|
logger.info("Audio conversion to MP3 completed successfully") |
|
except ffmpeg.Error as e: |
|
logger.error(f"Error during audio conversion: {e.stderr.decode()}") |
|
return {'success': False, 'error': "Failed to convert audio to MP3"} |
|
finally: |
|
|
|
os.unlink(temp_file_path) |
|
|
|
return {'success': True, 'temp_file_path': mp3_temp_file_path} |
|
else: |
|
logger.error(f"Failed to download audio: HTTP {response.status_code}") |
|
return {'success': False, 'error': f"Download failed: HTTP {response.status_code}"} |
|
|
|
except Exception as e: |
|
logger.exception(f"Error fetching YouTube audio for video ID {video_id}: {str(e)}") |
|
return {'success': False, 'error': f"Error: {str(e)}"} |
|
|
|
def extract_text_from_document(file: UploadFile) -> dict: |
|
try: |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: |
|
content = file.file.read() |
|
temp_file.write(content) |
|
temp_file_path = temp_file.name |
|
|
|
text = textract.process(temp_file_path).decode('utf-8') |
|
|
|
os.unlink(temp_file_path) |
|
|
|
return { |
|
'success': True, |
|
'extracted_text': text |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': f"Error extracting text from document: {str(e)}" |
|
} |
|
|
|
def upload_to_s3(local_file, s3_file): |
|
s3_client = boto3.client( |
|
"s3", |
|
aws_access_key_id=S3_ACCESS_KEY_ID, |
|
aws_secret_access_key=S3_SECRET_ACCESS_KEY, |
|
region_name=S3_REGION, |
|
) |
|
|
|
try: |
|
s3_client.upload_file( |
|
local_file, |
|
S3_BUCKET, |
|
s3_file, |
|
ExtraArgs={'ContentDisposition': 'attachment'} |
|
) |
|
s3_url = f"https://{S3_BUCKET}.s3.{S3_REGION}.amazonaws.com/{s3_file}" |
|
return s3_url |
|
except NoCredentialsError: |
|
logger.error("Credentials not available") |
|
return None |
|
|
|
def image_search(query: str, num_results: int = 5) -> dict: |
|
try: |
|
with DDGS( |
|
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} |
|
) as ddgs: |
|
results = list(ddgs.images(query, max_results=num_results)) |
|
formatted_results = [ |
|
{ |
|
'title': result['title'], |
|
'image_url': result['image'], |
|
'thumbnail_url': result['thumbnail'], |
|
'source_url': result['url'], |
|
'width': result['width'], |
|
'height': result['height'] |
|
} |
|
for result in results |
|
] |
|
return { |
|
'success': True, |
|
'results': formatted_results |
|
} |
|
except Exception as e: |
|
logger.error(f"Error performing image search: {e}") |
|
return { |
|
'success': False, |
|
'error': f"Error performing image search: {str(e)}" |
|
} |
|
|
|
async def verify_api_key(api_key: str = Depends(api_key_header)): |
|
if api_key != API_KEY: |
|
raise HTTPException(status_code=401, detail="Invalid API Key") |
|
return api_key |
|
|
|
def download_stream(url, output_path): |
|
headers = { |
|
'User-Agent': random.choice(USER_AGENTS), |
|
'Accept': '*/*', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'Range': 'bytes=0-', |
|
'Connection': 'keep-alive', |
|
} |
|
|
|
with requests.get(url, headers=headers, stream=True) as r: |
|
r.raise_for_status() |
|
with open(output_path, 'wb') as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
def get_best_video_stream(streams): |
|
video_streams = [s for s in streams if s['mimeType'].startswith('video/')] |
|
|
|
|
|
preferred_stream = next((s for s in video_streams if s['qualityLabel'] == '720p'), None) |
|
if not preferred_stream: |
|
|
|
below_720p = [s for s in video_streams if int(s['qualityLabel'][:-1]) < 720] |
|
preferred_stream = max(below_720p, key=lambda x: int(x['qualityLabel'][:-1])) if below_720p else None |
|
|
|
return preferred_stream |
|
|
|
def get_best_audio_stream(streams): |
|
audio_streams = [s for s in streams if s['mimeType'].startswith('audio/')] |
|
return max(audio_streams, key=lambda x: x['bitrate']) if audio_streams else None |
|
|
|
@app.get("/search-videos/") |
|
async def search_videos( |
|
query: str, |
|
num_videos: int = Query(default=2, ge=1, le=10), |
|
api_key: str = Depends(verify_api_key) |
|
): |
|
videos = await search_and_get_videos(query, num_videos) |
|
if not videos: |
|
raise HTTPException(status_code=404, detail="No videos found or an error occurred during the search.") |
|
return {"videos": videos} |
|
|
|
@app.get("/get-audio/{video_id}") |
|
async def get_audio(video_id: str, api_key: str = Depends(verify_api_key)): |
|
result = get_youtube_audio(video_id) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
|
|
s3_file_name = f"{video_id}.mp3" |
|
s3_url = upload_to_s3(result['temp_file_path'], s3_file_name) |
|
|
|
if s3_url: |
|
os.unlink(result['temp_file_path']) |
|
return {"audio_url": s3_url} |
|
else: |
|
raise HTTPException(status_code=500, detail="Failed to upload audio to S3") |
|
|
|
@app.post("/get-video-streams") |
|
async def get_video_streams(video_id: str, api_key: str = Depends(verify_api_key)): |
|
try: |
|
|
|
client = InnerTube("WEB") |
|
|
|
|
|
video_info = client.player(video_id) |
|
|
|
|
|
streams = video_info["streamingData"]["adaptiveFormats"] |
|
video_stream = get_best_video_stream(streams) |
|
audio_stream = get_best_audio_stream(streams) |
|
|
|
if not video_stream or not audio_stream: |
|
raise HTTPException(status_code=404, detail="Could not find suitable video or audio stream") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as video_file, \ |
|
tempfile.NamedTemporaryFile(delete=False, suffix='.m4a') as audio_file: |
|
|
|
download_stream(video_stream['url'], video_file.name) |
|
download_stream(audio_stream['url'], audio_file.name) |
|
|
|
|
|
output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
|
try: |
|
( |
|
ffmpeg |
|
.input(video_file.name) |
|
.input(audio_file.name) |
|
.output(output_file.name, vcodec='libx264', acodec='aac') |
|
.overwrite_output() |
|
.run(capture_stdout=True, capture_stderr=True) |
|
) |
|
except ffmpeg.Error as e: |
|
raise HTTPException(status_code=500, detail=f"Error combining video and audio: {e.stderr.decode()}") |
|
|
|
|
|
s3_file_name = f"{video_id}_combined.mp4" |
|
s3_url = upload_to_s3(output_file.name, s3_file_name) |
|
|
|
if s3_url: |
|
return {"video_url": s3_url} |
|
else: |
|
raise HTTPException(status_code=500, detail="Failed to upload video to S3") |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}") |
|
finally: |
|
|
|
for file in [video_file.name, audio_file.name, output_file.name]: |
|
if os.path.exists(file): |
|
os.unlink(file) |
|
|
|
@app.post("/extract-text/") |
|
async def extract_text(file: UploadFile, api_key: str = Depends(verify_api_key)): |
|
result = extract_text_from_document(file) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return {"extracted_text": result['extracted_text']} |
|
|
|
@app.get("/image-search/") |
|
async def image_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
|
result = image_search(query, num_results) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return result |
|
|
|
class DuckDuckGoSearch: |
|
async def search(self, query: str, num_results: int = 5) -> list: |
|
url = f"https://html.duckduckgo.com/html/?q={query}" |
|
headers = { |
|
"User-Agent": "Mozilla/5.0", |
|
"Referer": "https://google.com/", |
|
"Cookie": "kl=wt-wt", |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, headers=headers) as response: |
|
if response.status != 200: |
|
raise Exception("Failed to fetch data from DuckDuckGo") |
|
|
|
html = await response.text() |
|
soup = BeautifulSoup(html, "html.parser") |
|
results = [] |
|
|
|
for result in soup.select(".result"): |
|
title = result.select_one(".result__title .result__a") |
|
url = result.select_one(".result__url") |
|
desc = result.select_one(".result__snippet") |
|
|
|
if title and url and desc: |
|
results.append({ |
|
"title": title.get_text(strip=True), |
|
"body": desc.get_text(strip=True), |
|
"href": f"https://{url.get_text(strip=True)}", |
|
}) |
|
|
|
if len(results) >= num_results: |
|
break |
|
|
|
return results |
|
|
|
async def web_search(query: str, num_results: int = 5) -> dict: |
|
try: |
|
results = await DuckDuckGoSearch().search(query, num_results) |
|
return { |
|
'success': True, |
|
'results': results |
|
} |
|
except Exception as e: |
|
return { |
|
'success': False, |
|
'error': str(e) |
|
} |
|
|
|
@app.get("/web-search/") |
|
async def web_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)): |
|
result = await web_search(query, num_results) |
|
if not result['success']: |
|
raise HTTPException(status_code=500, detail=result['error']) |
|
return result |