Edmond7 commited on
Commit
90171b7
·
verified ·
1 Parent(s): 7715dcd

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +55 -221
app.py CHANGED
@@ -1,17 +1,15 @@
1
  import os
2
  from fastapi import FastAPI, HTTPException, UploadFile, Depends, Query
3
  from fastapi.security import APIKeyHeader
4
- import aiohttp
5
  import asyncio
6
  import json
7
  import tempfile
8
  from typing import List, Dict
9
  import logging
10
- import random
11
- import textract
12
- import boto3
13
- from botocore.exceptions import NoCredentialsError
14
- from duckduckgo_search import DDGS
15
 
16
  app = FastAPI()
17
 
@@ -24,251 +22,87 @@ API_KEY_NAME = "X-API-Key"
24
  api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
25
 
26
  # Constants
27
- INVIDIOUS_INSTANCES = [
28
- "https://invidious.privacydev.net",
29
- "https://invidious.reallyaweso.me",
30
- "https://invidious.adminforge.de"
31
- ]
32
  API_KEY = os.environ.get("API_KEY")
33
-
34
- # S3 Configuration
35
- S3_ACCESS_KEY_ID = os.environ.get("S3_ACCESS_KEY_ID")
36
- S3_SECRET_ACCESS_KEY = os.environ.get("S3_SECRET_ACCESS_KEY")
37
- S3_BUCKET = os.environ.get("S3_BUCKET")
38
- S3_REGION = os.environ.get("S3_REGION")
39
-
40
- if not all([API_KEY, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY, S3_BUCKET, S3_REGION]):
41
- raise ValueError("Missing required environment variables")
42
-
43
- def get_random_user_agent() -> str:
44
- user_agents = [
45
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
46
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
47
- "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36",
48
- ]
49
- return random.choice(user_agents)
50
-
51
- async def search_and_get_videos(query: str, num_videos: int = 2) -> List[Dict]:
52
- for instance in INVIDIOUS_INSTANCES:
53
- url = f"{instance}/api/v1/search?q={query}&type=video"
54
- try:
55
- async with aiohttp.ClientSession() as session:
56
- async with session.get(url, headers={"User-Agent": get_random_user_agent()}) as response:
57
- response.raise_for_status()
58
- search_results = await response.json()
59
- videos = [
60
- {
61
- "id": video.get("videoId"),
62
- "title": video.get("title"),
63
- "thumbnail": video["videoThumbnails"][0]["url"]
64
- if video.get("videoThumbnails")
65
- else "",
66
- }
67
- for video in search_results
68
- ][:num_videos]
69
- return videos
70
- except aiohttp.ClientError as e:
71
- logger.error(f"Error performing video search on {instance}: {e}")
72
- logger.error("All Invidious instances failed")
73
- return []
74
-
75
- async def get_youtube_audio(video_id: str, max_retries: int = 3) -> Dict:
76
- for instance in INVIDIOUS_INSTANCES:
77
- for attempt in range(max_retries):
78
- try:
79
- url = f"{instance}/api/v1/videos/{video_id}"
80
-
81
- async with aiohttp.ClientSession() as session:
82
- async with session.get(url) as response:
83
- response.raise_for_status()
84
- video_data = await response.json()
85
-
86
- audio_format = next((format for format in video_data.get('adaptiveFormats', [])
87
- if format.get('type', '').startswith('audio/mp4')), None)
88
-
89
- if audio_format:
90
- audio_url = audio_format.get('url')
91
- if audio_url:
92
- try:
93
- async with session.get(audio_url) as audio_response:
94
- audio_content = await audio_response.read()
95
-
96
- with tempfile.NamedTemporaryFile(delete=False, suffix='.m4a') as temp_file:
97
- temp_file.write(audio_content)
98
- temp_file_path = temp_file.name
99
-
100
- return {'success': True, 'temp_file_path': temp_file_path}
101
- except aiohttp.ServerDisconnectedError:
102
- if attempt == max_retries - 1:
103
- logger.error(f"Max retries reached for video ID {video_id} on {instance}")
104
- break
105
- await asyncio.sleep(1 * (attempt + 1))
106
- continue
107
-
108
- logger.warning(f"No suitable audio format found for video ID {video_id} on {instance}")
109
- break
110
- except aiohttp.ClientError as e:
111
- logger.error(f"Network error fetching YouTube audio for video ID {video_id} on {instance}: {e}")
112
- except json.JSONDecodeError:
113
- logger.error(f"Error decoding JSON response for video ID {video_id} on {instance}")
114
- except Exception as e:
115
- logger.error(f"Unexpected error fetching YouTube audio for video ID {video_id} on {instance}: {e}")
116
- if attempt == max_retries - 1:
117
- break
118
- await asyncio.sleep(1 * (attempt + 1))
119
 
120
- return {'success': False, 'error': "Failed to fetch audio after multiple attempts on all instances"}
121
-
122
- def extract_text_from_document(file: UploadFile) -> dict:
123
  try:
124
- with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file:
125
- content = file.file.read()
126
- temp_file.write(content)
127
- temp_file_path = temp_file.name
128
-
129
- text = textract.process(temp_file_path).decode('utf-8')
130
-
131
- os.unlink(temp_file_path)
132
-
133
- return {
134
- 'success': True,
135
- 'extracted_text': text
136
- }
137
- except Exception as e:
138
- return {
139
- 'success': False,
140
- 'error': f"Error extracting text from document: {str(e)}"
141
- }
142
-
143
- def upload_to_s3(local_file, s3_file):
144
- s3_client = boto3.client(
145
- "s3",
146
- aws_access_key_id=S3_ACCESS_KEY_ID,
147
- aws_secret_access_key=S3_SECRET_ACCESS_KEY,
148
- region_name=S3_REGION,
149
- )
150
-
151
- try:
152
- s3_client.upload_file(local_file, S3_BUCKET, s3_file)
153
- s3_url = f"https://{S3_BUCKET}.s3.{S3_REGION}.amazonaws.com/{s3_file}"
154
- return s3_url
155
- except NoCredentialsError:
156
- logger.error("Credentials not available")
157
- return None
158
-
159
- async def verify_api_key(api_key: str = Depends(api_key_header)):
160
- if api_key != API_KEY:
161
- raise HTTPException(status_code=401, detail="Invalid API Key")
162
- return api_key
163
-
164
- @app.get("/search-videos/")
165
- async def search_videos(
166
- query: str,
167
- num_videos: int = Query(default=2, ge=1, le=10),
168
- api_key: str = Depends(verify_api_key)
169
- ):
170
- videos = await search_and_get_videos(query, num_videos)
171
- if not videos:
172
- raise HTTPException(status_code=404, detail="No videos found or an error occurred during the search.")
173
- return {"videos": videos}
174
-
175
- @app.get("/get-audio/{video_id}")
176
- async def get_audio(video_id: str, api_key: str = Depends(verify_api_key)):
177
- result = await get_youtube_audio(video_id)
178
- if not result['success']:
179
- raise HTTPException(status_code=404, detail=result['error'])
180
-
181
- s3_file_name = f"{video_id}.m4a"
182
- s3_url = upload_to_s3(result['temp_file_path'], s3_file_name)
183
-
184
- if s3_url:
185
- os.unlink(result['temp_file_path']) # Remove the temporary file
186
- return {"audio_url": s3_url}
187
- else:
188
- raise HTTPException(status_code=500, detail="Failed to upload audio to S3")
189
-
190
- @app.post("/extract-text/")
191
- async def extract_text(file: UploadFile, api_key: str = Depends(verify_api_key)):
192
- result = extract_text_from_document(file)
193
- if not result['success']:
194
- raise HTTPException(status_code=500, detail=result['error'])
195
- return {"extracted_text": result['extracted_text']}
196
-
197
- def web_search(query: str, num_results: int = 5) -> dict:
198
- """
199
- Perform a web search using DuckDuckGo and return the results.
200
- Args:
201
- query (str): The search query.
202
- num_results (int, optional): The number of results to return. Defaults to 5.
203
- Returns:
204
- dict: A dictionary containing the success status and either the search results or an error message.
205
- """
206
- try:
207
- with DDGS() as ddgs:
208
- results = list(ddgs.text(query, max_results=num_results))
209
  formatted_results = [
210
  {
211
- 'title': result['title'],
212
- 'body': result['body'],
213
- 'href': result['href']
214
  }
215
  for result in results
216
  ]
217
- return {
218
- 'success': True,
219
- 'results': formatted_results
220
- }
221
- except Exception as e:
222
- logger.error(f"Error performing web search: {e}")
223
- return {
224
- 'success': False,
225
- 'error': f"Error performing web search: {str(e)}"
226
- }
227
-
228
- def image_search(query: str, num_results: int = 5) -> dict:
229
- """
230
- Perform an image search using DuckDuckGo and return the results.
231
- Args:
232
- query (str): The search query.
233
- num_results (int, optional): The number of results to return. Defaults to 5.
234
- Returns:
235
- dict: A dictionary containing the success status and either the image search results or an error message.
236
- """
237
- try:
238
- with DDGS() as ddgs:
239
- results = list(ddgs.images(query, max_results=num_results))
240
  formatted_results = [
241
  {
242
- 'title': result['title'],
243
- 'image_url': result['image'],
244
- 'thumbnail_url': result['thumbnail'],
245
- 'source_url': result['url'],
246
- 'width': result['width'],
247
- 'height': result['height']
248
  }
249
  for result in results
250
  ]
 
251
  return {
252
  'success': True,
253
  'results': formatted_results
254
  }
255
  except Exception as e:
256
- logger.error(f"Error performing image search: {e}")
257
  return {
258
  'success': False,
259
- 'error': f"Error performing image search: {str(e)}"
260
  }
261
 
 
 
 
 
 
262
  @app.get("/web-search/")
263
  async def web_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)):
264
- result = web_search(query, num_results)
265
  if not result['success']:
266
  raise HTTPException(status_code=500, detail=result['error'])
267
  return result
268
 
269
  @app.get("/image-search/")
270
  async def image_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)):
271
- result = image_search(query, num_results)
272
  if not result['success']:
273
  raise HTTPException(status_code=500, detail=result['error'])
274
  return result
 
1
  import os
2
  from fastapi import FastAPI, HTTPException, UploadFile, Depends, Query
3
  from fastapi.security import APIKeyHeader
 
4
  import asyncio
5
  import json
6
  import tempfile
7
  from typing import List, Dict
8
  import logging
9
+ import requests
10
+ from requests_random_user_agent import UserAgent
11
+ from fp.fp import FreeProxy
12
+ from bs4 import BeautifulSoup
 
13
 
14
  app = FastAPI()
15
 
 
22
  api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
23
 
24
  # Constants
 
 
 
 
 
25
  API_KEY = os.environ.get("API_KEY")
26
+ TIMEOUT = 0.9 # Set timeout to 0.9 seconds
27
+
28
+ if not API_KEY:
29
+ raise ValueError("Missing required environment variable: API_KEY")
30
+
31
+ def get_proxy():
32
+ return FreeProxy(
33
+ timeout=TIMEOUT,
34
+ rand=True,
35
+ ssl=True # Use only SSL proxies
36
+ ).get()
37
+
38
+ def perform_duckduckgo_search(query: str, search_type: str = 'web', num_results: int = 5) -> dict:
39
+ base_url = 'https://duckduckgo.com/html/'
40
+ params = {
41
+ 'q': query,
42
+ 'ia': 'web' if search_type == 'web' else 'images'
43
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
 
 
 
 
45
  try:
46
+ proxy = get_proxy()
47
+ response = requests.get(
48
+ base_url,
49
+ params=params,
50
+ headers={'User-Agent': UserAgent().get_random_user_agent()},
51
+ proxies={'https': proxy}, # Use only HTTPS proxy
52
+ timeout=TIMEOUT
53
+ )
54
+ response.raise_for_status()
55
+
56
+ soup = BeautifulSoup(response.text, 'html.parser')
57
+
58
+ if search_type == 'web':
59
+ results = soup.find_all('div', class_='result__body')[:num_results]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  formatted_results = [
61
  {
62
+ 'title': result.find('h2', class_='result__title').text.strip(),
63
+ 'body': result.find('a', class_='result__snippet').text.strip(),
64
+ 'href': result.find('a', class_='result__url')['href']
65
  }
66
  for result in results
67
  ]
68
+ else: # image search
69
+ results = soup.find_all('div', class_='tile--img')[:num_results]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  formatted_results = [
71
  {
72
+ 'title': result.find('span', class_='tile--img__title').text.strip(),
73
+ 'image_url': result.find('img')['src'],
74
+ 'thumbnail_url': result.find('img')['src'],
75
+ 'source_url': result.find('a')['href'],
 
 
76
  }
77
  for result in results
78
  ]
79
+
80
  return {
81
  'success': True,
82
  'results': formatted_results
83
  }
84
  except Exception as e:
85
+ logger.error(f"Error performing {'web' if search_type == 'web' else 'image'} search: {e}")
86
  return {
87
  'success': False,
88
+ 'error': f"Error performing {'web' if search_type == 'web' else 'image'} search: {str(e)}"
89
  }
90
 
91
+ async def verify_api_key(api_key: str = Depends(api_key_header)):
92
+ if api_key != API_KEY:
93
+ raise HTTPException(status_code=401, detail="Invalid API Key")
94
+ return api_key
95
+
96
  @app.get("/web-search/")
97
  async def web_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)):
98
+ result = perform_duckduckgo_search(query, 'web', num_results)
99
  if not result['success']:
100
  raise HTTPException(status_code=500, detail=result['error'])
101
  return result
102
 
103
  @app.get("/image-search/")
104
  async def image_search_endpoint(query: str, num_results: int = 5, api_key: str = Depends(verify_api_key)):
105
+ result = perform_duckduckgo_search(query, 'images', num_results)
106
  if not result['success']:
107
  raise HTTPException(status_code=500, detail=result['error'])
108
  return result