Abhaykoul commited on
Commit
476ef3e
·
verified ·
1 Parent(s): 6f99d02

Delete webscout.py

Browse files
Files changed (1) hide show
  1. webscout.py +0 -1980
webscout.py DELETED
@@ -1,1980 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import html
4
- import http.cookiejar as cookiejar
5
-
6
- import warnings
7
- from concurrent.futures import ThreadPoolExecutor
8
- from datetime import datetime, timezone
9
- from decimal import Decimal
10
- from functools import cached_property
11
- from itertools import cycle, islice
12
- from random import choice
13
- from threading import Event
14
- from types import TracebackType
15
- from typing import Optional, cast
16
- import html.parser
17
- import requests
18
-
19
- import primp # type: ignore
20
-
21
- try:
22
- from lxml.etree import _Element
23
- from lxml.html import HTMLParser as LHTMLParser
24
- from lxml.html import document_fromstring
25
-
26
- LXML_AVAILABLE = True
27
- except ImportError:
28
- LXML_AVAILABLE = False
29
-
30
- class WebscoutE(Exception):
31
- """Base exception class for search."""
32
-
33
-
34
- class RatelimitE(Exception):
35
- """Raised for rate limit exceeded errors during API requests."""
36
-
37
- class ConversationLimitException(Exception):
38
- """Raised for conversation limit exceeded errors during API requests."""
39
- pass
40
- class TimeoutE(Exception):
41
- """Raised for timeout errors during API requests."""
42
-
43
- class FailedToGenerateResponseError(Exception):
44
-
45
- """Provider failed to fetch response"""
46
- class AllProvidersFailure(Exception):
47
- """None of the providers generated response successfully"""
48
- pass
49
-
50
- class FacebookInvalidCredentialsException(Exception):
51
- pass
52
-
53
-
54
- class FacebookRegionBlocked(Exception):
55
- pass
56
-
57
-
58
- import re
59
- from decimal import Decimal
60
- from html import unescape
61
- from math import atan2, cos, radians, sin, sqrt
62
- from typing import Any, Dict, List, Union
63
- from urllib.parse import unquote
64
-
65
-
66
- try:
67
- HAS_ORJSON = True
68
- import orjson
69
- except ImportError:
70
- HAS_ORJSON = False
71
- import json
72
-
73
- REGEX_STRIP_TAGS = re.compile("<.*?>")
74
-
75
-
76
- def json_dumps(obj: Any) -> str:
77
- try:
78
- return (
79
- orjson.dumps(obj, option=orjson.OPT_INDENT_2).decode()
80
- if HAS_ORJSON
81
- else json.dumps(obj, ensure_ascii=False, indent=2)
82
- )
83
- except Exception as ex:
84
- raise WebscoutE(f"{type(ex).__name__}: {ex}") from ex
85
-
86
-
87
- def json_loads(obj: Union[str, bytes]) -> Any:
88
- try:
89
- return orjson.loads(obj) if HAS_ORJSON else json.loads(obj)
90
- except Exception as ex:
91
- raise WebscoutE(f"{type(ex).__name__}: {ex}") from ex
92
-
93
-
94
- def _extract_vqd(html_bytes: bytes, keywords: str) -> str:
95
- """Extract vqd from html bytes."""
96
- for c1, c1_len, c2 in (
97
- (b'vqd="', 5, b'"'),
98
- (b"vqd=", 4, b"&"),
99
- (b"vqd='", 5, b"'"),
100
- ):
101
- try:
102
- start = html_bytes.index(c1) + c1_len
103
- end = html_bytes.index(c2, start)
104
- return html_bytes[start:end].decode()
105
- except ValueError:
106
- pass
107
- raise WebscoutE(f"_extract_vqd() {keywords=} Could not extract vqd.")
108
-
109
-
110
- def _text_extract_json(html_bytes: bytes, keywords: str) -> List[Dict[str, str]]:
111
- """text(backend="api") -> extract json from html."""
112
- try:
113
- start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24
114
- end = html_bytes.index(b");DDG.duckbar.load(", start)
115
- data = html_bytes[start:end]
116
- result: List[Dict[str, str]] = json_loads(data)
117
- return result
118
- except Exception as ex:
119
- raise WebscoutE(f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex
120
- raise WebscoutE(f"_text_extract_json() {keywords=} return None")
121
-
122
-
123
- def _normalize(raw_html: str) -> str:
124
- """Strip HTML tags from the raw_html string."""
125
- return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else ""
126
-
127
-
128
- def _normalize_url(url: str) -> str:
129
- """Unquote URL and replace spaces with '+'."""
130
- return unquote(url.replace(" ", "+")) if url else ""
131
-
132
-
133
- def _calculate_distance(lat1: Decimal, lon1: Decimal, lat2: Decimal, lon2: Decimal) -> float:
134
- """Calculate distance between two points in km. Haversine formula."""
135
- R = 6371.0087714 # Earth's radius in km
136
- rlat1, rlon1, rlat2, rlon2 = map(radians, [float(lat1), float(lon1), float(lat2), float(lon2)])
137
- dlon, dlat = rlon2 - rlon1, rlat2 - rlat1
138
- a = sin(dlat / 2) ** 2 + cos(rlat1) * cos(rlat2) * sin(dlon / 2) ** 2
139
- c = 2 * atan2(sqrt(a), sqrt(1 - a))
140
- return R * c
141
-
142
- def _expand_proxy_tb_alias(proxy: str | None) -> str | None:
143
- """Expand "tb" to a full proxy URL if applicable."""
144
- return "socks5://127.0.0.1:9150" if proxy == "tb" else proxy
145
-
146
-
147
-
148
- class WEBS:
149
- """webscout class to get search results from duckduckgo.com."""
150
-
151
- _executor: ThreadPoolExecutor = ThreadPoolExecutor()
152
- _impersonates = (
153
- "chrome_100", "chrome_101", "chrome_104", "chrome_105", "chrome_106", "chrome_107", "chrome_108",
154
- "chrome_109", "chrome_114", "chrome_116", "chrome_117", "chrome_118", "chrome_119", "chrome_120",
155
- #"chrome_123", "chrome_124", "chrome_126",
156
- "chrome_127", "chrome_128", "chrome_129",
157
- "safari_ios_16.5", "safari_ios_17.2", "safari_ios_17.4.1", "safari_15.3", "safari_15.5", "safari_15.6.1",
158
- "safari_16", "safari_16.5", "safari_17.0", "safari_17.2.1", "safari_17.4.1", "safari_17.5", "safari_18",
159
- "safari_ipad_18",
160
- "edge_101", "edge_122", "edge_127",
161
- ) # fmt: skip
162
-
163
- def __init__(
164
- self,
165
- headers: dict[str, str] | None = None,
166
- proxy: str | None = None,
167
- proxies: dict[str, str] | str | None = None, # deprecated
168
- timeout: int | None = 10,
169
- ) -> None:
170
- """Initialize the WEBS object.
171
-
172
- Args:
173
- headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None.
174
- proxy (str, optional): proxy for the HTTP client, supports http/https/socks5 protocols.
175
- example: "http://user:[email protected]:3128". Defaults to None.
176
- timeout (int, optional): Timeout value for the HTTP client. Defaults to 10.
177
- """
178
- self.proxy: str | None = _expand_proxy_tb_alias(proxy) # replaces "tb" with "socks5://127.0.0.1:9150"
179
- assert self.proxy is None or isinstance(self.proxy, str), "proxy must be a str"
180
- if not proxy and proxies:
181
- warnings.warn("'proxies' is deprecated, use 'proxy' instead.", stacklevel=1)
182
- self.proxy = proxies.get("http") or proxies.get("https") if isinstance(proxies, dict) else proxies
183
- self.headers = headers if headers else {}
184
- self.headers["Referer"] = "https://duckduckgo.com/"
185
- self.client = primp.Client(
186
- headers=self.headers,
187
- proxy=self.proxy,
188
- timeout=timeout,
189
- cookie_store=True,
190
- referer=True,
191
- impersonate=choice(self._impersonates),
192
- follow_redirects=False,
193
- verify=False,
194
- )
195
- self._exception_event = Event()
196
- self._chat_messages: list[dict[str, str]] = []
197
- self._chat_tokens_count = 0
198
- self._chat_vqd: str = ""
199
-
200
- def __enter__(self) -> WEBS:
201
- return self
202
-
203
- def __exit__(
204
- self,
205
- exc_type: type[BaseException] | None = None,
206
- exc_val: BaseException | None = None,
207
- exc_tb: TracebackType | None = None,
208
- ) -> None:
209
- pass
210
-
211
- @cached_property
212
- def parser(self) -> LHTMLParser:
213
- """Get HTML parser."""
214
- return LHTMLParser(remove_blank_text=True, remove_comments=True, remove_pis=True, collect_ids=False)
215
-
216
- def _get_url(
217
- self,
218
- method: str,
219
- url: str,
220
- params: dict[str, str] | None = None,
221
- content: bytes | None = None,
222
- data: dict[str, str] | bytes | None = None,
223
- ) -> bytes:
224
- if self._exception_event.is_set():
225
- raise WebscoutE("Exception occurred in previous call.")
226
- try:
227
- resp = self.client.request(method, url, params=params, content=content, data=data)
228
- except Exception as ex:
229
- self._exception_event.set()
230
- if "time" in str(ex).lower():
231
- raise TimeoutE(f"{url} {type(ex).__name__}: {ex}") from ex
232
- raise WebscoutE(f"{url} {type(ex).__name__}: {ex}") from ex
233
- if resp.status_code == 200:
234
- return cast(bytes, resp.content)
235
- self._exception_event.set()
236
- if resp.status_code in (202, 301, 403):
237
- raise RatelimitE(f"{resp.url} {resp.status_code} Ratelimit")
238
- raise WebscoutE(f"{resp.url} return None. {params=} {content=} {data=}")
239
-
240
- def _get_vqd(self, keywords: str) -> str:
241
- """Get vqd value for a search query."""
242
- resp_content = self._get_url("GET", "https://duckduckgo.com", params={"q": keywords})
243
- return _extract_vqd(resp_content, keywords)
244
-
245
- def chat(self, keywords: str, model: str = "gpt-4o-mini", timeout: int = 30) -> str:
246
- """Initiates a chat session with webscout AI.
247
-
248
- Args:
249
- keywords (str): The initial message or question to send to the AI.
250
- model (str): The model to use: "gpt-4o-mini", "claude-3-haiku", "llama-3.1-70b", "mixtral-8x7b".
251
- Defaults to "gpt-4o-mini".
252
- timeout (int): Timeout value for the HTTP client. Defaults to 20.
253
-
254
- Returns:
255
- str: The response from the AI.
256
- """
257
- models_deprecated = {
258
- "gpt-3.5": "gpt-4o-mini",
259
- "llama-3-70b": "llama-3.1-70b",
260
- }
261
- if model in models_deprecated:
262
- model = models_deprecated[model]
263
- models = {
264
- "claude-3-haiku": "claude-3-haiku-20240307",
265
- "gpt-4o-mini": "gpt-4o-mini",
266
- "llama-3.1-70b": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
267
- "mixtral-8x7b": "mistralai/Mixtral-8x7B-Instruct-v0.1",
268
- }
269
- # vqd
270
- if not self._chat_vqd:
271
- resp = self.client.get("https://duckduckgo.com/duckchat/v1/status", headers={"x-vqd-accept": "1"})
272
- self._chat_vqd = resp.headers.get("x-vqd-4", "")
273
-
274
- self._chat_messages.append({"role": "user", "content": keywords})
275
- self._chat_tokens_count += len(keywords) // 4 if len(keywords) >= 4 else 1 # approximate number of tokens
276
-
277
- json_data = {
278
- "model": models[model],
279
- "messages": self._chat_messages,
280
- }
281
- resp = self.client.post(
282
- "https://duckduckgo.com/duckchat/v1/chat",
283
- headers={"x-vqd-4": self._chat_vqd},
284
- json=json_data,
285
- timeout=timeout,
286
- )
287
- self._chat_vqd = resp.headers.get("x-vqd-4", "")
288
-
289
- data = ",".join(x for line in resp.text.rstrip("[DONE]LIMT_CVRSA\n").split("data:") if (x := line.strip()))
290
- data = json_loads("[" + data + "]")
291
-
292
- results = []
293
- for x in data:
294
- if x.get("action") == "error":
295
- err_message = x.get("type", "")
296
- if x.get("status") == 429:
297
- raise (
298
- ConversationLimitException(err_message)
299
- if err_message == "ERR_CONVERSATION_LIMIT"
300
- else RatelimitE(err_message)
301
- )
302
- raise WebscoutE(err_message)
303
- elif message := x.get("message"):
304
- results.append(message)
305
- result = "".join(results)
306
-
307
- self._chat_messages.append({"role": "assistant", "content": result})
308
- self._chat_tokens_count += len(results)
309
- return result
310
-
311
- def text(
312
- self,
313
- keywords: str,
314
- region: str = "wt-wt",
315
- safesearch: str = "moderate",
316
- timelimit: str | None = None,
317
- backend: str = "api",
318
- max_results: int | None = None,
319
- ) -> list[dict[str, str]]:
320
- """webscout text search. Query params: https://duckduckgo.com/params.
321
-
322
- Args:
323
- keywords: keywords for query.
324
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
325
- safesearch: on, moderate, off. Defaults to "moderate".
326
- timelimit: d, w, m, y. Defaults to None.
327
- backend: api, html, lite. Defaults to api.
328
- api - collect data from https://duckduckgo.com,
329
- html - collect data from https://html.duckduckgo.com,
330
- lite - collect data from https://lite.duckduckgo.com.
331
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
332
-
333
- Returns:
334
- List of dictionaries with search results, or None if there was an error.
335
-
336
- Raises:
337
- WebscoutE: Base exception for webscout errors.
338
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
339
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
340
- """
341
- if LXML_AVAILABLE is False and backend != "api":
342
- backend = "api"
343
- warnings.warn("lxml is not installed. Using backend='api'.", stacklevel=2)
344
-
345
- if backend == "api":
346
- results = self._text_api(keywords, region, safesearch, timelimit, max_results)
347
- elif backend == "html":
348
- results = self._text_html(keywords, region, timelimit, max_results)
349
- elif backend == "lite":
350
- results = self._text_lite(keywords, region, timelimit, max_results)
351
- return results
352
-
353
- def _text_api(
354
- self,
355
- keywords: str,
356
- region: str = "wt-wt",
357
- safesearch: str = "moderate",
358
- timelimit: str | None = None,
359
- max_results: int | None = None,
360
- ) -> list[dict[str, str]]:
361
- """webscout text search. Query params: https://duckduckgo.com/params.
362
-
363
- Args:
364
- keywords: keywords for query.
365
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
366
- safesearch: on, moderate, off. Defaults to "moderate".
367
- timelimit: d, w, m, y. Defaults to None.
368
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
369
-
370
- Returns:
371
- List of dictionaries with search results.
372
-
373
- Raises:
374
- WebscoutE: Base exception for webscout errors.
375
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
376
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
377
- """
378
- assert keywords, "keywords is mandatory"
379
-
380
- vqd = self._get_vqd(keywords)
381
-
382
- payload = {
383
- "q": keywords,
384
- "kl": region,
385
- "l": region,
386
- "p": "",
387
- "s": "0",
388
- "df": "",
389
- "vqd": vqd,
390
- "bing_market": f"{region[3:]}-{region[:2].upper()}",
391
- "ex": "",
392
- }
393
- safesearch = safesearch.lower()
394
- if safesearch == "moderate":
395
- payload["ex"] = "-1"
396
- elif safesearch == "off":
397
- payload["ex"] = "-2"
398
- elif safesearch == "on": # strict
399
- payload["p"] = "1"
400
- if timelimit:
401
- payload["df"] = timelimit
402
-
403
- cache = set()
404
- results: list[dict[str, str]] = []
405
-
406
- def _text_api_page(s: int) -> list[dict[str, str]]:
407
- payload["s"] = f"{s}"
408
- resp_content = self._get_url("GET", "https://links.duckduckgo.com/d.js", params=payload)
409
- page_data = _text_extract_json(resp_content, keywords)
410
- page_results = []
411
- for row in page_data:
412
- href = row.get("u", None)
413
- if href and href not in cache and href != f"http://www.google.com/search?q={keywords}":
414
- cache.add(href)
415
- body = _normalize(row["a"])
416
- if body:
417
- result = {
418
- "title": _normalize(row["t"]),
419
- "href": _normalize_url(href),
420
- "body": body,
421
- }
422
- page_results.append(result)
423
- return page_results
424
-
425
- slist = [0]
426
- if max_results:
427
- max_results = min(max_results, 2023)
428
- slist.extend(range(23, max_results, 50))
429
- try:
430
- for r in self._executor.map(_text_api_page, slist):
431
- results.extend(r)
432
- except Exception as e:
433
- raise e
434
-
435
- return list(islice(results, max_results))
436
-
437
- def _text_html(
438
- self,
439
- keywords: str,
440
- region: str = "wt-wt",
441
- timelimit: str | None = None,
442
- max_results: int | None = None,
443
- ) -> list[dict[str, str]]:
444
- """webscout text search. Query params: https://duckduckgo.com/params.
445
-
446
- Args:
447
- keywords: keywords for query.
448
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
449
- timelimit: d, w, m, y. Defaults to None.
450
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
451
-
452
- Returns:
453
- List of dictionaries with search results.
454
-
455
- Raises:
456
- WebscoutE: Base exception for webscout errors.
457
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
458
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
459
- """
460
- assert keywords, "keywords is mandatory"
461
-
462
- payload = {
463
- "q": keywords,
464
- "s": "0",
465
- "o": "json",
466
- "api": "d.js",
467
- "vqd": "",
468
- "kl": region,
469
- "bing_market": region,
470
- }
471
- if timelimit:
472
- payload["df"] = timelimit
473
- if max_results and max_results > 20:
474
- vqd = self._get_vqd(keywords)
475
- payload["vqd"] = vqd
476
-
477
- cache = set()
478
- results: list[dict[str, str]] = []
479
-
480
- def _text_html_page(s: int) -> list[dict[str, str]]:
481
- payload["s"] = f"{s}"
482
- resp_content = self._get_url("POST", "https://html.duckduckgo.com/html", data=payload)
483
- if b"No results." in resp_content:
484
- return []
485
-
486
- page_results = []
487
- tree = document_fromstring(resp_content, self.parser)
488
- elements = tree.xpath("//div[h2]")
489
- if not isinstance(elements, list):
490
- return []
491
- for e in elements:
492
- if isinstance(e, _Element):
493
- hrefxpath = e.xpath("./a/@href")
494
- href = str(hrefxpath[0]) if hrefxpath and isinstance(hrefxpath, list) else None
495
- if (
496
- href
497
- and href not in cache
498
- and not href.startswith(
499
- ("http://www.google.com/search?q=", "https://duckduckgo.com/y.js?ad_domain")
500
- )
501
- ):
502
- cache.add(href)
503
- titlexpath = e.xpath("./h2/a/text()")
504
- title = str(titlexpath[0]) if titlexpath and isinstance(titlexpath, list) else ""
505
- bodyxpath = e.xpath("./a//text()")
506
- body = "".join(str(x) for x in bodyxpath) if bodyxpath and isinstance(bodyxpath, list) else ""
507
- result = {
508
- "title": _normalize(title),
509
- "href": _normalize_url(href),
510
- "body": _normalize(body),
511
- }
512
- page_results.append(result)
513
- return page_results
514
-
515
- slist = [0]
516
- if max_results:
517
- max_results = min(max_results, 2023)
518
- slist.extend(range(23, max_results, 50))
519
- try:
520
- for r in self._executor.map(_text_html_page, slist):
521
- results.extend(r)
522
- except Exception as e:
523
- raise e
524
-
525
- return list(islice(results, max_results))
526
-
527
- def _text_lite(
528
- self,
529
- keywords: str,
530
- region: str = "wt-wt",
531
- timelimit: str | None = None,
532
- max_results: int | None = None,
533
- ) -> list[dict[str, str]]:
534
- """webscout text search. Query params: https://duckduckgo.com/params.
535
-
536
- Args:
537
- keywords: keywords for query.
538
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
539
- timelimit: d, w, m, y. Defaults to None.
540
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
541
-
542
- Returns:
543
- List of dictionaries with search results.
544
-
545
- Raises:
546
- WebscoutE: Base exception for webscout errors.
547
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
548
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
549
- """
550
- assert keywords, "keywords is mandatory"
551
-
552
- payload = {
553
- "q": keywords,
554
- "s": "0",
555
- "o": "json",
556
- "api": "d.js",
557
- "vqd": "",
558
- "kl": region,
559
- "bing_market": region,
560
- }
561
- if timelimit:
562
- payload["df"] = timelimit
563
-
564
- cache = set()
565
- results: list[dict[str, str]] = []
566
-
567
- def _text_lite_page(s: int) -> list[dict[str, str]]:
568
- payload["s"] = f"{s}"
569
- resp_content = self._get_url("POST", "https://lite.duckduckgo.com/lite/", data=payload)
570
- if b"No more results." in resp_content:
571
- return []
572
-
573
- page_results = []
574
- tree = document_fromstring(resp_content, self.parser)
575
- elements = tree.xpath("//table[last()]//tr")
576
- if not isinstance(elements, list):
577
- return []
578
-
579
- data = zip(cycle(range(1, 5)), elements)
580
- for i, e in data:
581
- if isinstance(e, _Element):
582
- if i == 1:
583
- hrefxpath = e.xpath(".//a//@href")
584
- href = str(hrefxpath[0]) if hrefxpath and isinstance(hrefxpath, list) else None
585
- if (
586
- href is None
587
- or href in cache
588
- or href.startswith(
589
- ("http://www.google.com/search?q=", "https://duckduckgo.com/y.js?ad_domain")
590
- )
591
- ):
592
- [next(data, None) for _ in range(3)] # skip block(i=1,2,3,4)
593
- else:
594
- cache.add(href)
595
- titlexpath = e.xpath(".//a//text()")
596
- title = str(titlexpath[0]) if titlexpath and isinstance(titlexpath, list) else ""
597
- elif i == 2:
598
- bodyxpath = e.xpath(".//td[@class='result-snippet']//text()")
599
- body = (
600
- "".join(str(x) for x in bodyxpath).strip()
601
- if bodyxpath and isinstance(bodyxpath, list)
602
- else ""
603
- )
604
- if href:
605
- result = {
606
- "title": _normalize(title),
607
- "href": _normalize_url(href),
608
- "body": _normalize(body),
609
- }
610
- page_results.append(result)
611
- return page_results
612
-
613
- slist = [0]
614
- if max_results:
615
- max_results = min(max_results, 2023)
616
- slist.extend(range(23, max_results, 50))
617
- try:
618
- for r in self._executor.map(_text_lite_page, slist):
619
- results.extend(r)
620
- except Exception as e:
621
- raise e
622
-
623
- return list(islice(results, max_results))
624
-
625
- def images(
626
- self,
627
- keywords: str,
628
- region: str = "wt-wt",
629
- safesearch: str = "moderate",
630
- timelimit: str | None = None,
631
- size: str | None = None,
632
- color: str | None = None,
633
- type_image: str | None = None,
634
- layout: str | None = None,
635
- license_image: str | None = None,
636
- max_results: int | None = None,
637
- ) -> list[dict[str, str]]:
638
- """webscout images search. Query params: https://duckduckgo.com/params.
639
-
640
- Args:
641
- keywords: keywords for query.
642
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
643
- safesearch: on, moderate, off. Defaults to "moderate".
644
- timelimit: Day, Week, Month, Year. Defaults to None.
645
- size: Small, Medium, Large, Wallpaper. Defaults to None.
646
- color: color, Monochrome, Red, Orange, Yellow, Green, Blue,
647
- Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None.
648
- type_image: photo, clipart, gif, transparent, line.
649
- Defaults to None.
650
- layout: Square, Tall, Wide. Defaults to None.
651
- license_image: any (All Creative Commons), Public (PublicDomain),
652
- Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially),
653
- Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and
654
- Use Commercially). Defaults to None.
655
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
656
-
657
- Returns:
658
- List of dictionaries with images search results.
659
-
660
- Raises:
661
- WebscoutE: Base exception for webscout errors.
662
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
663
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
664
- """
665
- assert keywords, "keywords is mandatory"
666
-
667
- vqd = self._get_vqd(keywords)
668
-
669
- safesearch_base = {"on": "1", "moderate": "1", "off": "-1"}
670
- timelimit = f"time:{timelimit}" if timelimit else ""
671
- size = f"size:{size}" if size else ""
672
- color = f"color:{color}" if color else ""
673
- type_image = f"type:{type_image}" if type_image else ""
674
- layout = f"layout:{layout}" if layout else ""
675
- license_image = f"license:{license_image}" if license_image else ""
676
- payload = {
677
- "l": region,
678
- "o": "json",
679
- "q": keywords,
680
- "vqd": vqd,
681
- "f": f"{timelimit},{size},{color},{type_image},{layout},{license_image}",
682
- "p": safesearch_base[safesearch.lower()],
683
- }
684
-
685
- cache = set()
686
- results: list[dict[str, str]] = []
687
-
688
- def _images_page(s: int) -> list[dict[str, str]]:
689
- payload["s"] = f"{s}"
690
- resp_content = self._get_url("GET", "https://duckduckgo.com/i.js", params=payload)
691
- resp_json = json_loads(resp_content)
692
-
693
- page_data = resp_json.get("results", [])
694
- page_results = []
695
- for row in page_data:
696
- image_url = row.get("image")
697
- if image_url and image_url not in cache:
698
- cache.add(image_url)
699
- result = {
700
- "title": row["title"],
701
- "image": _normalize_url(image_url),
702
- "thumbnail": _normalize_url(row["thumbnail"]),
703
- "url": _normalize_url(row["url"]),
704
- "height": row["height"],
705
- "width": row["width"],
706
- "source": row["source"],
707
- }
708
- page_results.append(result)
709
- return page_results
710
-
711
- slist = [0]
712
- if max_results:
713
- max_results = min(max_results, 500)
714
- slist.extend(range(100, max_results, 100))
715
- try:
716
- for r in self._executor.map(_images_page, slist):
717
- results.extend(r)
718
- except Exception as e:
719
- raise e
720
-
721
- return list(islice(results, max_results))
722
-
723
- def videos(
724
- self,
725
- keywords: str,
726
- region: str = "wt-wt",
727
- safesearch: str = "moderate",
728
- timelimit: str | None = None,
729
- resolution: str | None = None,
730
- duration: str | None = None,
731
- license_videos: str | None = None,
732
- max_results: int | None = None,
733
- ) -> list[dict[str, str]]:
734
- """webscout videos search. Query params: https://duckduckgo.com/params.
735
-
736
- Args:
737
- keywords: keywords for query.
738
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
739
- safesearch: on, moderate, off. Defaults to "moderate".
740
- timelimit: d, w, m. Defaults to None.
741
- resolution: high, standart. Defaults to None.
742
- duration: short, medium, long. Defaults to None.
743
- license_videos: creativeCommon, youtube. Defaults to None.
744
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
745
-
746
- Returns:
747
- List of dictionaries with videos search results.
748
-
749
- Raises:
750
- WebscoutE: Base exception for webscout errors.
751
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
752
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
753
- """
754
- assert keywords, "keywords is mandatory"
755
-
756
- vqd = self._get_vqd(keywords)
757
-
758
- safesearch_base = {"on": "1", "moderate": "-1", "off": "-2"}
759
- timelimit = f"publishedAfter:{timelimit}" if timelimit else ""
760
- resolution = f"videoDefinition:{resolution}" if resolution else ""
761
- duration = f"videoDuration:{duration}" if duration else ""
762
- license_videos = f"videoLicense:{license_videos}" if license_videos else ""
763
- payload = {
764
- "l": region,
765
- "o": "json",
766
- "q": keywords,
767
- "vqd": vqd,
768
- "f": f"{timelimit},{resolution},{duration},{license_videos}",
769
- "p": safesearch_base[safesearch.lower()],
770
- }
771
-
772
- cache = set()
773
- results: list[dict[str, str]] = []
774
-
775
- def _videos_page(s: int) -> list[dict[str, str]]:
776
- payload["s"] = f"{s}"
777
- resp_content = self._get_url("GET", "https://duckduckgo.com/v.js", params=payload)
778
- resp_json = json_loads(resp_content)
779
-
780
- page_data = resp_json.get("results", [])
781
- page_results = []
782
- for row in page_data:
783
- if row["content"] not in cache:
784
- cache.add(row["content"])
785
- page_results.append(row)
786
- return page_results
787
-
788
- slist = [0]
789
- if max_results:
790
- max_results = min(max_results, 400)
791
- slist.extend(range(60, max_results, 60))
792
- try:
793
- for r in self._executor.map(_videos_page, slist):
794
- results.extend(r)
795
- except Exception as e:
796
- raise e
797
-
798
- return list(islice(results, max_results))
799
-
800
- def news(
801
- self,
802
- keywords: str,
803
- region: str = "wt-wt",
804
- safesearch: str = "moderate",
805
- timelimit: str | None = None,
806
- max_results: int | None = None,
807
- ) -> list[dict[str, str]]:
808
- """webscout news search. Query params: https://duckduckgo.com/params.
809
-
810
- Args:
811
- keywords: keywords for query.
812
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
813
- safesearch: on, moderate, off. Defaults to "moderate".
814
- timelimit: d, w, m. Defaults to None.
815
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
816
-
817
- Returns:
818
- List of dictionaries with news search results.
819
-
820
- Raises:
821
- WebscoutE: Base exception for webscout errors.
822
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
823
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
824
- """
825
- assert keywords, "keywords is mandatory"
826
-
827
- vqd = self._get_vqd(keywords)
828
-
829
- safesearch_base = {"on": "1", "moderate": "-1", "off": "-2"}
830
- payload = {
831
- "l": region,
832
- "o": "json",
833
- "noamp": "1",
834
- "q": keywords,
835
- "vqd": vqd,
836
- "p": safesearch_base[safesearch.lower()],
837
- }
838
- if timelimit:
839
- payload["df"] = timelimit
840
-
841
- cache = set()
842
- results: list[dict[str, str]] = []
843
-
844
- def _news_page(s: int) -> list[dict[str, str]]:
845
- payload["s"] = f"{s}"
846
- resp_content = self._get_url("GET", "https://duckduckgo.com/news.js", params=payload)
847
- resp_json = json_loads(resp_content)
848
- page_data = resp_json.get("results", [])
849
- page_results = []
850
- for row in page_data:
851
- if row["url"] not in cache:
852
- cache.add(row["url"])
853
- image_url = row.get("image", None)
854
- result = {
855
- "date": datetime.fromtimestamp(row["date"], timezone.utc).isoformat(),
856
- "title": row["title"],
857
- "body": _normalize(row["excerpt"]),
858
- "url": _normalize_url(row["url"]),
859
- "image": _normalize_url(image_url),
860
- "source": row["source"],
861
- }
862
- page_results.append(result)
863
- return page_results
864
-
865
- slist = [0]
866
- if max_results:
867
- max_results = min(max_results, 120)
868
- slist.extend(range(30, max_results, 30))
869
- try:
870
- for r in self._executor.map(_news_page, slist):
871
- results.extend(r)
872
- except Exception as e:
873
- raise e
874
-
875
- return list(islice(results, max_results))
876
-
877
- def answers(self, keywords: str) -> list[dict[str, str]]:
878
- """webscout instant answers. Query params: https://duckduckgo.com/params.
879
-
880
- Args:
881
- keywords: keywords for query,
882
-
883
- Returns:
884
- List of dictionaries with instant answers results.
885
-
886
- Raises:
887
- WebscoutE: Base exception for webscout errors.
888
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
889
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
890
- """
891
- assert keywords, "keywords is mandatory"
892
-
893
- payload = {
894
- "q": f"what is {keywords}",
895
- "format": "json",
896
- }
897
- resp_content = self._get_url("GET", "https://api.duckduckgo.com/", params=payload)
898
- page_data = json_loads(resp_content)
899
-
900
- results = []
901
- answer = page_data.get("AbstractText")
902
- url = page_data.get("AbstractURL")
903
- if answer:
904
- results.append(
905
- {
906
- "icon": None,
907
- "text": answer,
908
- "topic": None,
909
- "url": url,
910
- }
911
- )
912
-
913
- # related
914
- payload = {
915
- "q": f"{keywords}",
916
- "format": "json",
917
- }
918
- resp_content = self._get_url("GET", "https://api.duckduckgo.com/", params=payload)
919
- resp_json = json_loads(resp_content)
920
- page_data = resp_json.get("RelatedTopics", [])
921
-
922
- for row in page_data:
923
- topic = row.get("Name")
924
- if not topic:
925
- icon = row["Icon"].get("URL")
926
- results.append(
927
- {
928
- "icon": f"https://duckduckgo.com{icon}" if icon else "",
929
- "text": row["Text"],
930
- "topic": None,
931
- "url": row["FirstURL"],
932
- }
933
- )
934
- else:
935
- for subrow in row["Topics"]:
936
- icon = subrow["Icon"].get("URL")
937
- results.append(
938
- {
939
- "icon": f"https://duckduckgo.com{icon}" if icon else "",
940
- "text": subrow["Text"],
941
- "topic": topic,
942
- "url": subrow["FirstURL"],
943
- }
944
- )
945
-
946
- return results
947
-
948
- def suggestions(self, keywords: str, region: str = "wt-wt") -> list[dict[str, str]]:
949
- """webscout suggestions. Query params: https://duckduckgo.com/params.
950
-
951
- Args:
952
- keywords: keywords for query.
953
- region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
954
-
955
- Returns:
956
- List of dictionaries with suggestions results.
957
-
958
- Raises:
959
- WebscoutE: Base exception for webscout errors.
960
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
961
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
962
- """
963
- assert keywords, "keywords is mandatory"
964
-
965
- payload = {
966
- "q": keywords,
967
- "kl": region,
968
- }
969
- resp_content = self._get_url("GET", "https://duckduckgo.com/ac/", params=payload)
970
- page_data = json_loads(resp_content)
971
- return [r for r in page_data]
972
-
973
- def maps(
974
- self,
975
- keywords: str,
976
- place: str | None = None,
977
- street: str | None = None,
978
- city: str | None = None,
979
- county: str | None = None,
980
- state: str | None = None,
981
- country: str | None = None,
982
- postalcode: str | None = None,
983
- latitude: str | None = None,
984
- longitude: str | None = None,
985
- radius: int = 0,
986
- max_results: int | None = None,
987
- ) -> list[dict[str, str]]:
988
- """webscout maps search. Query params: https://duckduckgo.com/params.
989
-
990
- Args:
991
- keywords: keywords for query
992
- place: if set, the other parameters are not used. Defaults to None.
993
- street: house number/street. Defaults to None.
994
- city: city of search. Defaults to None.
995
- county: county of search. Defaults to None.
996
- state: state of search. Defaults to None.
997
- country: country of search. Defaults to None.
998
- postalcode: postalcode of search. Defaults to None.
999
- latitude: geographic coordinate (north-south position). Defaults to None.
1000
- longitude: geographic coordinate (east-west position); if latitude and
1001
- longitude are set, the other parameters are not used. Defaults to None.
1002
- radius: expand the search square by the distance in kilometers. Defaults to 0.
1003
- max_results: max number of results. If None, returns results only from the first response. Defaults to None.
1004
-
1005
- Returns:
1006
- List of dictionaries with maps search results, or None if there was an error.
1007
-
1008
- Raises:
1009
- WebscoutE: Base exception for webscout errors.
1010
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
1011
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
1012
- """
1013
- assert keywords, "keywords is mandatory"
1014
-
1015
- vqd = self._get_vqd(keywords)
1016
-
1017
- # if longitude and latitude are specified, skip the request about bbox to the nominatim api
1018
- if latitude and longitude:
1019
- lat_t = Decimal(latitude.replace(",", "."))
1020
- lat_b = Decimal(latitude.replace(",", "."))
1021
- lon_l = Decimal(longitude.replace(",", "."))
1022
- lon_r = Decimal(longitude.replace(",", "."))
1023
- if radius == 0:
1024
- radius = 1
1025
- # otherwise request about bbox to nominatim api
1026
- else:
1027
- if place:
1028
- params = {
1029
- "q": place,
1030
- "polygon_geojson": "0",
1031
- "format": "jsonv2",
1032
- }
1033
- else:
1034
- params = {
1035
- "polygon_geojson": "0",
1036
- "format": "jsonv2",
1037
- }
1038
- if street:
1039
- params["street"] = street
1040
- if city:
1041
- params["city"] = city
1042
- if county:
1043
- params["county"] = county
1044
- if state:
1045
- params["state"] = state
1046
- if country:
1047
- params["country"] = country
1048
- if postalcode:
1049
- params["postalcode"] = postalcode
1050
- # request nominatim api to get coordinates box
1051
- resp_content = self._get_url(
1052
- "GET",
1053
- "https://nominatim.openstreetmap.org/search.php",
1054
- params=params,
1055
- )
1056
- if resp_content == b"[]":
1057
- raise WebscoutE("maps() Coordinates are not found, check function parameters.")
1058
- resp_json = json_loads(resp_content)
1059
- coordinates = resp_json[0]["boundingbox"]
1060
- lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2])
1061
- lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3])
1062
-
1063
- # if a radius is specified, expand the search square
1064
- lat_t += Decimal(radius) * Decimal(0.008983)
1065
- lat_b -= Decimal(radius) * Decimal(0.008983)
1066
- lon_l -= Decimal(radius) * Decimal(0.008983)
1067
- lon_r += Decimal(radius) * Decimal(0.008983)
1068
-
1069
-
1070
- cache = set()
1071
- results: list[dict[str, str]] = []
1072
-
1073
- def _maps_page(
1074
- bbox: tuple[Decimal, Decimal, Decimal, Decimal],
1075
- ) -> list[dict[str, str]] | None:
1076
- if max_results and len(results) >= max_results:
1077
- return None
1078
- lat_t, lon_l, lat_b, lon_r = bbox
1079
- params = {
1080
- "q": keywords,
1081
- "vqd": vqd,
1082
- "tg": "maps_places",
1083
- "rt": "D",
1084
- "mkexp": "b",
1085
- "wiki_info": "1",
1086
- "is_requery": "1",
1087
- "bbox_tl": f"{lat_t},{lon_l}",
1088
- "bbox_br": f"{lat_b},{lon_r}",
1089
- "strict_bbox": "1",
1090
- }
1091
- resp_content = self._get_url("GET", "https://duckduckgo.com/local.js", params=params)
1092
- resp_json = json_loads(resp_content)
1093
- page_data = resp_json.get("results", [])
1094
-
1095
- page_results = []
1096
- for res in page_data:
1097
- r_name = f'{res["name"]} {res["address"]}'
1098
- if r_name in cache:
1099
- continue
1100
- else:
1101
- cache.add(r_name)
1102
- result = {
1103
- "title": res["name"],
1104
- "address": res["address"],
1105
- "country_code": res["country_code"],
1106
- "url": _normalize_url(res["website"]),
1107
- "phone": res["phone"] or "",
1108
- "latitude": res["coordinates"]["latitude"],
1109
- "longitude": res["coordinates"]["longitude"],
1110
- "source": _normalize_url(res["url"]),
1111
- "image": x.get("image", "") if (x := res["embed"]) else "",
1112
- "desc": x.get("description", "") if (x := res["embed"]) else "",
1113
- "hours": res["hours"] or "",
1114
- "category": res["ddg_category"] or "",
1115
- "facebook": f"www.facebook.com/profile.php?id={x}" if (x := res["facebook_id"]) else "",
1116
- "instagram": f"https://www.instagram.com/{x}" if (x := res["instagram_id"]) else "",
1117
- "twitter": f"https://twitter.com/{x}" if (x := res["twitter_id"]) else "",
1118
- }
1119
- page_results.append(result)
1120
- return page_results
1121
-
1122
- # search squares (bboxes)
1123
- start_bbox = (lat_t, lon_l, lat_b, lon_r)
1124
- work_bboxes = [start_bbox]
1125
- while work_bboxes:
1126
- queue_bboxes = [] # for next iteration, at the end of the iteration work_bboxes = queue_bboxes
1127
- tasks = []
1128
- for bbox in work_bboxes:
1129
- tasks.append(bbox)
1130
- # if distance between coordinates > 1, divide the square into 4 parts and save them in queue_bboxes
1131
- if _calculate_distance(lat_t, lon_l, lat_b, lon_r) > 1:
1132
- lat_t, lon_l, lat_b, lon_r = bbox
1133
- lat_middle = (lat_t + lat_b) / 2
1134
- lon_middle = (lon_l + lon_r) / 2
1135
- bbox1 = (lat_t, lon_l, lat_middle, lon_middle)
1136
- bbox2 = (lat_t, lon_middle, lat_middle, lon_r)
1137
- bbox3 = (lat_middle, lon_l, lat_b, lon_middle)
1138
- bbox4 = (lat_middle, lon_middle, lat_b, lon_r)
1139
- queue_bboxes.extend([bbox1, bbox2, bbox3, bbox4])
1140
-
1141
- # gather tasks using asyncio.wait_for and timeout
1142
- work_bboxes_results = []
1143
- try:
1144
- for r in self._executor.map(_maps_page, tasks):
1145
- if r:
1146
- work_bboxes_results.extend(r)
1147
- except Exception as e:
1148
- raise e
1149
-
1150
- for x in work_bboxes_results:
1151
- if isinstance(x, list):
1152
- results.extend(x)
1153
- elif isinstance(x, dict):
1154
- results.append(x)
1155
-
1156
- work_bboxes = queue_bboxes
1157
- if not max_results or len(results) >= max_results or len(work_bboxes_results) == 0:
1158
- break
1159
-
1160
- return list(islice(results, max_results))
1161
-
1162
- def translate(self, keywords: list[str] | str, from_: str | None = None, to: str = "en") -> list[dict[str, str]]:
1163
- """webscout translate.
1164
-
1165
- Args:
1166
- keywords: string or list of strings to translate.
1167
- from_: translate from (defaults automatically). Defaults to None.
1168
- to: what language to translate. Defaults to "en".
1169
-
1170
- Returns:
1171
- List od dictionaries with translated keywords.
1172
-
1173
- Raises:
1174
- WebscoutE: Base exception for webscout errors.
1175
- RatelimitE: Inherits from WebscoutE, raised for exceeding API request rate limits.
1176
- TimeoutE: Inherits from WebscoutE, raised for API request timeouts.
1177
- """
1178
- assert keywords, "keywords is mandatory"
1179
-
1180
- vqd = self._get_vqd("translate")
1181
-
1182
- payload = {
1183
- "vqd": vqd,
1184
- "query": "translate",
1185
- "to": to,
1186
- }
1187
- if from_:
1188
- payload["from"] = from_
1189
-
1190
- def _translate_keyword(keyword: str) -> dict[str, str]:
1191
- resp_content = self._get_url(
1192
- "POST",
1193
- "https://duckduckgo.com/translation.js",
1194
- params=payload,
1195
- content=keyword.encode(),
1196
- )
1197
- page_data: dict[str, str] = json_loads(resp_content)
1198
- page_data["original"] = keyword
1199
- return page_data
1200
-
1201
- if isinstance(keywords, str):
1202
- keywords = [keywords]
1203
-
1204
- results = []
1205
- try:
1206
- for r in self._executor.map(_translate_keyword, keywords):
1207
- results.append(r)
1208
- except Exception as e:
1209
- raise e
1210
-
1211
- return results
1212
-
1213
-
1214
- html_parser = html.parser.HTMLParser()
1215
-
1216
-
1217
- def unescape(string):
1218
- return html.unescape(string)
1219
-
1220
-
1221
- WATCH_URL = 'https://www.youtube.com/watch?v={video_id}'
1222
-
1223
-
1224
- class TranscriptRetrievalError(Exception):
1225
- """Base class for transcript retrieval errors."""
1226
-
1227
- def __init__(self, video_id, message):
1228
- super().__init__(message.format(video_url=WATCH_URL.format(video_id=video_id)))
1229
- self.video_id = video_id
1230
-
1231
-
1232
- class YouTubeRequestFailedError(TranscriptRetrievalError):
1233
- """Raised when a request to YouTube fails."""
1234
-
1235
- def __init__(self, video_id, http_error):
1236
- message = 'Request to YouTube failed: {reason}'
1237
- super().__init__(video_id, message.format(reason=str(http_error)))
1238
-
1239
-
1240
- class VideoUnavailableError(TranscriptRetrievalError):
1241
- """Raised when the video is unavailable."""
1242
-
1243
- def __init__(self, video_id):
1244
- message = 'The video is no longer available'
1245
- super().__init__(video_id, message)
1246
-
1247
-
1248
- class InvalidVideoIdError(TranscriptRetrievalError):
1249
- """Raised when an invalid video ID is provided."""
1250
-
1251
- def __init__(self, video_id):
1252
- message = (
1253
- 'You provided an invalid video id. Make sure you are using the video id and NOT the url!\n\n'
1254
- 'Do NOT run: `YTTranscriber.get_transcript("https://www.youtube.com/watch?v=1234")`\n'
1255
- 'Instead run: `YTTranscriber.get_transcript("1234")`'
1256
- )
1257
- super().__init__(video_id, message)
1258
-
1259
-
1260
- class TooManyRequestsError(TranscriptRetrievalError):
1261
- """Raised when YouTube rate limits the requests."""
1262
-
1263
- def __init__(self, video_id):
1264
- message = (
1265
- 'YouTube is receiving too many requests from this IP and now requires solving a captcha to continue. '
1266
- 'One of the following things can be done to work around this:\n\
1267
- - Manually solve the captcha in a browser and export the cookie. '
1268
- '- Use a different IP address\n\
1269
- - Wait until the ban on your IP has been lifted'
1270
- )
1271
- super().__init__(video_id, message)
1272
-
1273
-
1274
- class TranscriptsDisabledError(TranscriptRetrievalError):
1275
- """Raised when transcripts are disabled for the video."""
1276
-
1277
- def __init__(self, video_id):
1278
- message = 'Subtitles are disabled for this video'
1279
- super().__init__(video_id, message)
1280
-
1281
-
1282
- class NoTranscriptAvailableError(TranscriptRetrievalError):
1283
- """Raised when no transcripts are available for the video."""
1284
-
1285
- def __init__(self, video_id):
1286
- message = 'No transcripts are available for this video'
1287
- super().__init__(video_id, message)
1288
-
1289
-
1290
- class NotTranslatableError(TranscriptRetrievalError):
1291
- """Raised when the transcript is not translatable."""
1292
-
1293
- def __init__(self, video_id):
1294
- message = 'The requested language is not translatable'
1295
- super().__init__(video_id, message)
1296
-
1297
-
1298
- class TranslationLanguageNotAvailableError(TranscriptRetrievalError):
1299
- """Raised when the requested translation language is not available."""
1300
-
1301
- def __init__(self, video_id):
1302
- message = 'The requested translation language is not available'
1303
- super().__init__(video_id, message)
1304
-
1305
-
1306
- class CookiePathInvalidError(TranscriptRetrievalError):
1307
- """Raised when the cookie path is invalid."""
1308
-
1309
- def __init__(self, video_id):
1310
- message = 'The provided cookie file was unable to be loaded'
1311
- super().__init__(video_id, message)
1312
-
1313
-
1314
- class CookiesInvalidError(TranscriptRetrievalError):
1315
- """Raised when the provided cookies are invalid."""
1316
-
1317
- def __init__(self, video_id):
1318
- message = 'The cookies provided are not valid (may have expired)'
1319
- super().__init__(video_id, message)
1320
-
1321
-
1322
- class FailedToCreateConsentCookieError(TranscriptRetrievalError):
1323
- """Raised when consent cookie creation fails."""
1324
-
1325
- def __init__(self, video_id):
1326
- message = 'Failed to automatically give consent to saving cookies'
1327
- super().__init__(video_id, message)
1328
-
1329
-
1330
- class NoTranscriptFoundError(TranscriptRetrievalError):
1331
- """Raised when no transcript is found for the requested language codes."""
1332
-
1333
- def __init__(self, video_id, requested_language_codes, transcript_data):
1334
- message = (
1335
- 'No transcripts were found for any of the requested language codes: {requested_language_codes}\n\n'
1336
- '{transcript_data}'
1337
- )
1338
- super().__init__(video_id, message.format(
1339
- requested_language_codes=requested_language_codes,
1340
- transcript_data=str(transcript_data)
1341
- ))
1342
-
1343
-
1344
- class YTTranscriber:
1345
- """
1346
- Main class for retrieving YouTube transcripts.
1347
- """
1348
-
1349
- @staticmethod
1350
- def get_transcript(video_url: str, languages: Optional[str] = 'en',
1351
- proxies: Dict[str, str] = None,
1352
- cookies: str = None,
1353
- preserve_formatting: bool = False) -> List[Dict[str, Union[str, float]]]:
1354
- """
1355
- Retrieves the transcript for a given YouTube video URL.
1356
-
1357
- Args:
1358
- video_url (str): YouTube video URL (supports various formats).
1359
- languages (str, optional): Language code for the transcript.
1360
- If None, fetches the auto-generated transcript.
1361
- Defaults to 'en'.
1362
- proxies (Dict[str, str], optional): Proxies to use for the request. Defaults to None.
1363
- cookies (str, optional): Path to the cookie file. Defaults to None.
1364
- preserve_formatting (bool, optional): Whether to preserve formatting tags. Defaults to False.
1365
-
1366
- Returns:
1367
- List[Dict[str, Union[str, float]]]: A list of dictionaries, each containing:
1368
- - 'text': The transcribed text.
1369
- - 'start': The start time of the text segment (in seconds).
1370
- - 'duration': The duration of the text segment (in seconds).
1371
-
1372
- Raises:
1373
- TranscriptRetrievalError: If there's an error retrieving the transcript.
1374
- """
1375
- video_id = YTTranscriber._extract_video_id(video_url)
1376
-
1377
- with requests.Session() as http_client:
1378
- if cookies:
1379
- http_client.cookies = YTTranscriber._load_cookies(cookies, video_id)
1380
- http_client.proxies = proxies if proxies else {}
1381
- transcript_list_fetcher = TranscriptListFetcher(http_client)
1382
- transcript_list = transcript_list_fetcher.fetch(video_id)
1383
-
1384
- if languages is None: # Get auto-generated transcript
1385
- return transcript_list.find_generated_transcript(['any']).fetch(
1386
- preserve_formatting=preserve_formatting)
1387
- else:
1388
- return transcript_list.find_transcript([languages]).fetch(preserve_formatting=preserve_formatting)
1389
-
1390
- @staticmethod
1391
- def _extract_video_id(video_url: str) -> str:
1392
- """Extracts the video ID from different YouTube URL formats."""
1393
- if 'youtube.com/watch?v=' in video_url:
1394
- video_id = video_url.split('youtube.com/watch?v=')[1].split('&')[0]
1395
- elif 'youtu.be/' in video_url:
1396
- video_id = video_url.split('youtu.be/')[1].split('?')[0]
1397
- else:
1398
- raise InvalidVideoIdError(video_url)
1399
- return video_id
1400
-
1401
- @staticmethod
1402
- def _load_cookies(cookies: str, video_id: str) -> cookiejar.MozillaCookieJar:
1403
- """Loads cookies from a file."""
1404
- try:
1405
- cookie_jar = cookiejar.MozillaCookieJar()
1406
- cookie_jar.load(cookies)
1407
- if not cookie_jar:
1408
- raise CookiesInvalidError(video_id)
1409
- return cookie_jar
1410
- except:
1411
- raise CookiePathInvalidError(video_id)
1412
-
1413
-
1414
- class TranscriptListFetcher:
1415
- """Fetches the list of transcripts for a YouTube video."""
1416
-
1417
- def __init__(self, http_client: requests.Session):
1418
- """Initializes TranscriptListFetcher."""
1419
- self._http_client = http_client
1420
-
1421
- def fetch(self, video_id: str):
1422
- """Fetches and returns a TranscriptList."""
1423
- return TranscriptList.build(
1424
- self._http_client,
1425
- video_id,
1426
- self._extract_captions_json(self._fetch_video_html(video_id), video_id),
1427
- )
1428
-
1429
- def _extract_captions_json(self, html: str, video_id: str) -> dict:
1430
- """Extracts the captions JSON data from the video's HTML."""
1431
- splitted_html = html.split('"captions":')
1432
-
1433
- if len(splitted_html) <= 1:
1434
- if video_id.startswith('http://') or video_id.startswith('https://'):
1435
- raise InvalidVideoIdError(video_id)
1436
- if 'class="g-recaptcha"' in html:
1437
- raise TooManyRequestsError(video_id)
1438
- if '"playabilityStatus":' not in html:
1439
- raise VideoUnavailableError(video_id)
1440
-
1441
- raise TranscriptsDisabledError(video_id)
1442
-
1443
- captions_json = json.loads(
1444
- splitted_html[1].split(',"videoDetails')[0].replace('\n', '')
1445
- ).get('playerCaptionsTracklistRenderer')
1446
- if captions_json is None:
1447
- raise TranscriptsDisabledError(video_id)
1448
-
1449
- if 'captionTracks' not in captions_json:
1450
- raise TranscriptsDisabledError(video_id)
1451
-
1452
- return captions_json
1453
-
1454
- def _create_consent_cookie(self, html, video_id):
1455
- match = re.search('name="v" value="(.*?)"', html)
1456
- if match is None:
1457
- raise FailedToCreateConsentCookieError(video_id)
1458
- self._http_client.cookies.set('CONSENT', 'YES+' + match.group(1), domain='.youtube.com')
1459
-
1460
- def _fetch_video_html(self, video_id):
1461
- html = self._fetch_html(video_id)
1462
- if 'action="https://consent.youtube.com/s"' in html:
1463
- self._create_consent_cookie(html, video_id)
1464
- html = self._fetch_html(video_id)
1465
- if 'action="https://consent.youtube.com/s"' in html:
1466
- raise FailedToCreateConsentCookieError(video_id)
1467
- return html
1468
-
1469
- def _fetch_html(self, video_id):
1470
- response = self._http_client.get(WATCH_URL.format(video_id=video_id), headers={'Accept-Language': 'en-US'})
1471
- return unescape(_raise_http_errors(response, video_id).text)
1472
-
1473
-
1474
- class TranscriptList:
1475
- """Represents a list of available transcripts."""
1476
-
1477
- def __init__(self, video_id, manually_created_transcripts, generated_transcripts, translation_languages):
1478
- """
1479
- The constructor is only for internal use. Use the static build method instead.
1480
-
1481
- :param video_id: the id of the video this TranscriptList is for
1482
- :type video_id: str
1483
- :param manually_created_transcripts: dict mapping language codes to the manually created transcripts
1484
- :type manually_created_transcripts: dict[str, Transcript]
1485
- :param generated_transcripts: dict mapping language codes to the generated transcripts
1486
- :type generated_transcripts: dict[str, Transcript]
1487
- :param translation_languages: list of languages which can be used for translatable languages
1488
- :type translation_languages: list[dict[str, str]]
1489
- """
1490
- self.video_id = video_id
1491
- self._manually_created_transcripts = manually_created_transcripts
1492
- self._generated_transcripts = generated_transcripts
1493
- self._translation_languages = translation_languages
1494
-
1495
- @staticmethod
1496
- def build(http_client, video_id, captions_json):
1497
- """
1498
- Factory method for TranscriptList.
1499
-
1500
- :param http_client: http client which is used to make the transcript retrieving http calls
1501
- :type http_client: requests.Session
1502
- :param video_id: the id of the video this TranscriptList is for
1503
- :type video_id: str
1504
- :param captions_json: the JSON parsed from the YouTube pages static HTML
1505
- :type captions_json: dict
1506
- :return: the created TranscriptList
1507
- :rtype TranscriptList:
1508
- """
1509
- translation_languages = [
1510
- {
1511
- 'language': translation_language['languageName']['simpleText'],
1512
- 'language_code': translation_language['languageCode'],
1513
- } for translation_language in captions_json.get('translationLanguages', [])
1514
- ]
1515
-
1516
- manually_created_transcripts = {}
1517
- generated_transcripts = {}
1518
-
1519
- for caption in captions_json['captionTracks']:
1520
- if caption.get('kind', '') == 'asr':
1521
- transcript_dict = generated_transcripts
1522
- else:
1523
- transcript_dict = manually_created_transcripts
1524
-
1525
- transcript_dict[caption['languageCode']] = Transcript(
1526
- http_client,
1527
- video_id,
1528
- caption['baseUrl'],
1529
- caption['name']['simpleText'],
1530
- caption['languageCode'],
1531
- caption.get('kind', '') == 'asr',
1532
- translation_languages if caption.get('isTranslatable', False) else [],
1533
- )
1534
-
1535
- return TranscriptList(
1536
- video_id,
1537
- manually_created_transcripts,
1538
- generated_transcripts,
1539
- translation_languages,
1540
- )
1541
-
1542
- def __iter__(self):
1543
- return iter(list(self._manually_created_transcripts.values()) + list(self._generated_transcripts.values()))
1544
-
1545
- def find_transcript(self, language_codes):
1546
- """
1547
- Finds a transcript for a given language code. If no language is provided, it will
1548
- return the auto-generated transcript.
1549
-
1550
- :param language_codes: A list of language codes in a descending priority.
1551
- :type languages: list[str]
1552
- :return: the found Transcript
1553
- :rtype Transcript:
1554
- :raises: NoTranscriptFound
1555
- """
1556
- if 'any' in language_codes:
1557
- for transcript in self:
1558
- return transcript
1559
- return self._find_transcript(language_codes, [self._manually_created_transcripts, self._generated_transcripts])
1560
-
1561
- def find_generated_transcript(self, language_codes):
1562
- """
1563
- Finds an automatically generated transcript for a given language code.
1564
-
1565
- :param language_codes: A list of language codes in a descending priority. For example, if this is set to
1566
- ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if
1567
- it fails to do so.
1568
- :type languages: list[str]
1569
- :return: the found Transcript
1570
- :rtype Transcript:
1571
- :raises: NoTranscriptFound
1572
- """
1573
- if 'any' in language_codes:
1574
- for transcript in self:
1575
- if transcript.is_generated:
1576
- return transcript
1577
- return self._find_transcript(language_codes, [self._generated_transcripts])
1578
-
1579
- def find_manually_created_transcript(self, language_codes):
1580
- """
1581
- Finds a manually created transcript for a given language code.
1582
-
1583
- :param language_codes: A list of language codes in a descending priority. For example, if this is set to
1584
- ['de', 'en'] it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if
1585
- it fails to do so.
1586
- :type languages: list[str]
1587
- :return: the found Transcript
1588
- :rtype Transcript:
1589
- :raises: NoTranscriptFound
1590
- """
1591
- return self._find_transcript(language_codes, [self._manually_created_transcripts])
1592
-
1593
- def _find_transcript(self, language_codes, transcript_dicts):
1594
- for language_code in language_codes:
1595
- for transcript_dict in transcript_dicts:
1596
- if language_code in transcript_dict:
1597
- return transcript_dict[language_code]
1598
-
1599
- raise NoTranscriptFoundError(
1600
- self.video_id,
1601
- language_codes,
1602
- self
1603
- )
1604
-
1605
- def __str__(self):
1606
- return (
1607
- 'For this video ({video_id}) transcripts are available in the following languages:\n\n'
1608
- '(MANUALLY CREATED)\n'
1609
- '{available_manually_created_transcript_languages}\n\n'
1610
- '(GENERATED)\n'
1611
- '{available_generated_transcripts}\n\n'
1612
- '(TRANSLATION LANGUAGES)\n'
1613
- '{available_translation_languages}'
1614
- ).format(
1615
- video_id=self.video_id,
1616
- available_manually_created_transcript_languages=self._get_language_description(
1617
- str(transcript) for transcript in self._manually_created_transcripts.values()
1618
- ),
1619
- available_generated_transcripts=self._get_language_description(
1620
- str(transcript) for transcript in self._generated_transcripts.values()
1621
- ),
1622
- available_translation_languages=self._get_language_description(
1623
- '{language_code} ("{language}")'.format(
1624
- language=translation_language['language'],
1625
- language_code=translation_language['language_code'],
1626
- ) for translation_language in self._translation_languages
1627
- )
1628
- )
1629
-
1630
- def _get_language_description(self, transcript_strings):
1631
- description = '\n'.join(' - {transcript}'.format(transcript=transcript) for transcript in transcript_strings)
1632
- return description if description else 'None'
1633
-
1634
-
1635
- class Transcript:
1636
- """Represents a single transcript."""
1637
-
1638
- def __init__(self, http_client, video_id, url, language, language_code, is_generated, translation_languages):
1639
- """
1640
- You probably don't want to initialize this directly. Usually you'll access Transcript objects using a
1641
- TranscriptList.
1642
-
1643
- :param http_client: http client which is used to make the transcript retrieving http calls
1644
- :type http_client: requests.Session
1645
- :param video_id: the id of the video this TranscriptList is for
1646
- :type video_id: str
1647
- :param url: the url which needs to be called to fetch the transcript
1648
- :param language: the name of the language this transcript uses
1649
- :param language_code:
1650
- :param is_generated:
1651
- :param translation_languages:
1652
- """
1653
- self._http_client = http_client
1654
- self.video_id = video_id
1655
- self._url = url
1656
- self.language = language
1657
- self.language_code = language_code
1658
- self.is_generated = is_generated
1659
- self.translation_languages = translation_languages
1660
- self._translation_languages_dict = {
1661
- translation_language['language_code']: translation_language['language']
1662
- for translation_language in translation_languages
1663
- }
1664
-
1665
- def fetch(self, preserve_formatting=False):
1666
- """
1667
- Loads the actual transcript data.
1668
- :param preserve_formatting: whether to keep select HTML text formatting
1669
- :type preserve_formatting: bool
1670
- :return: a list of dictionaries containing the 'text', 'start' and 'duration' keys
1671
- :rtype [{'text': str, 'start': float, 'end': float}]:
1672
- """
1673
- response = self._http_client.get(self._url, headers={'Accept-Language': 'en-US'})
1674
- return TranscriptParser(preserve_formatting=preserve_formatting).parse(
1675
- _raise_http_errors(response, self.video_id).text,
1676
- )
1677
-
1678
- def __str__(self):
1679
- return '{language_code} ("{language}"){translation_description}'.format(
1680
- language=self.language,
1681
- language_code=self.language_code,
1682
- translation_description='[TRANSLATABLE]' if self.is_translatable else ''
1683
- )
1684
-
1685
- @property
1686
- def is_translatable(self):
1687
- return len(self.translation_languages) > 0
1688
-
1689
- def translate(self, language_code):
1690
- if not self.is_translatable:
1691
- raise NotTranslatableError(self.video_id)
1692
-
1693
- if language_code not in self._translation_languages_dict:
1694
- raise TranslationLanguageNotAvailableError(self.video_id)
1695
-
1696
- return Transcript(
1697
- self._http_client,
1698
- self.video_id,
1699
- '{url}&tlang={language_code}'.format(url=self._url, language_code=language_code),
1700
- self._translation_languages_dict[language_code],
1701
- language_code,
1702
- True,
1703
- [],
1704
- )
1705
-
1706
-
1707
- class TranscriptParser:
1708
- """Parses the transcript data from XML."""
1709
- _FORMATTING_TAGS = [
1710
- 'strong', # important
1711
- 'em', # emphasized
1712
- 'b', # bold
1713
- 'i', # italic
1714
- 'mark', # marked
1715
- 'small', # smaller
1716
- 'del', # deleted
1717
- 'ins', # inserted
1718
- 'sub', # subscript
1719
- 'sup', # superscript
1720
- ]
1721
-
1722
- def __init__(self, preserve_formatting=False):
1723
- self._html_regex = self._get_html_regex(preserve_formatting)
1724
-
1725
- def _get_html_regex(self, preserve_formatting):
1726
- if preserve_formatting:
1727
- formats_regex = '|'.join(self._FORMATTING_TAGS)
1728
- formats_regex = r'<\/?(?!\/?(' + formats_regex + r')\b).*?\b>'
1729
- html_regex = re.compile(formats_regex, re.IGNORECASE)
1730
- else:
1731
- html_regex = re.compile(r'<[^>]*>', re.IGNORECASE)
1732
- return html_regex
1733
-
1734
- def parse(self, plain_data):
1735
- return [
1736
- {
1737
- 'text': re.sub(self._html_regex, '', unescape(xml_element.text)),
1738
- 'start': float(xml_element.attrib['start']),
1739
- 'duration': float(xml_element.attrib.get('dur', '0.0')),
1740
- }
1741
- for xml_element in ElementTree.fromstring(plain_data)
1742
- if xml_element.text is not None
1743
- ]
1744
-
1745
-
1746
- def _raise_http_errors(response, video_id):
1747
- try:
1748
- response.raise_for_status()
1749
- return response
1750
- except requests.exceptions.HTTPError as error:
1751
- raise YouTubeRequestFailedError(video_id, error)
1752
-
1753
-
1754
- class LLM:
1755
- def __init__(self, model: str, system_message: str = "You are a Helpful AI."):
1756
- self.model = model
1757
- self.conversation_history = [{"role": "system", "content": system_message}]
1758
-
1759
- def chat(self, messages: List[Dict[str, str]]) -> Union[str, None]:
1760
- url = "https://api.deepinfra.com/v1/openai/chat/completions"
1761
- headers = {
1762
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
1763
- 'Accept-Language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3',
1764
- 'Cache-Control': 'no-cache',
1765
- 'Connection': 'keep-alive',
1766
- 'Content-Type': 'application/json',
1767
- 'Origin': 'https://deepinfra.com',
1768
- 'Pragma': 'no-cache',
1769
- 'Referer': 'https://deepinfra.com/',
1770
- 'Sec-Fetch-Dest': 'empty',
1771
- 'Sec-Fetch-Mode': 'cors',
1772
- 'Sec-Fetch-Site': 'same-site',
1773
- 'X-Deepinfra-Source': 'web-embed',
1774
- 'accept': 'text/event-stream',
1775
- 'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
1776
- 'sec-ch-ua-mobile': '?0',
1777
- 'sec-ch-ua-platform': '"macOS"'
1778
- }
1779
- data = json.dumps(
1780
- {
1781
- 'model': self.model,
1782
- 'messages': messages,
1783
- 'temperature': 0.7,
1784
- 'max_tokens': 16000,
1785
- 'stop': [],
1786
- 'stream': False #dont change it
1787
- }, separators=(',', ':')
1788
- )
1789
- try:
1790
- result = requests.post(url=url, data=data, headers=headers)
1791
- return result.json()['choices'][0]['message']['content']
1792
- except:
1793
- return None
1794
- def fastai(user, model="llama3-70b", system="Answer as concisely as possible."):
1795
- env_type = "tp16405b" if "405b" in model else "tp16"
1796
- data = {'body': {'messages': [{'role': 'system', 'content': system}, {'role': 'user', 'content': user}], 'stream': True, 'model': model}, 'env_type': env_type}
1797
- with requests.post('https://fast.snova.ai/api/completion', headers={'content-type': 'application/json'}, json=data, stream=True) as response:
1798
- output = ''
1799
- for line in response.iter_lines(decode_unicode=True):
1800
- if line.startswith('data:'):
1801
- try:
1802
- data = json.loads(line[len('data: '):])
1803
- output += data.get("choices", [{}])[0].get("delta", {}).get("content", '')
1804
- except json.JSONDecodeError:
1805
- if line[len('data: '):] == '[DONE]':
1806
- break
1807
- return output
1808
-
1809
-
1810
- from bs4 import BeautifulSoup
1811
- import requests
1812
- from typing import Dict, List, Optional, Union
1813
- from concurrent.futures import ThreadPoolExecutor, as_completed
1814
- from urllib.parse import quote
1815
- from termcolor import colored
1816
- import time
1817
- import random
1818
-
1819
- class GoogleS:
1820
- """
1821
- Class to perform Google searches and retrieve results.
1822
- """
1823
-
1824
- def __init__(
1825
- self,
1826
- headers: Optional[Dict[str, str]] = None,
1827
- proxy: Optional[str] = None,
1828
- timeout: Optional[int] = 10,
1829
- max_workers: int = 20 # Increased max workers for thread pool
1830
- ):
1831
- """Initializes the GoogleS object."""
1832
- self.proxy = proxy
1833
- self.headers = headers if headers else {
1834
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.62"
1835
- }
1836
- self.headers["Referer"] = "https://www.google.com/"
1837
- self.client = requests.Session()
1838
- self.client.headers.update(self.headers)
1839
- self.client.proxies.update({"http": self.proxy, "https": self.proxy})
1840
- self.timeout = timeout
1841
- self._executor = ThreadPoolExecutor(max_workers=max_workers)
1842
-
1843
- def __enter__(self):
1844
- return self
1845
-
1846
- def __exit__(self, exc_type, exc_val, exc_tb):
1847
- self.client.close()
1848
-
1849
- def _get_url(self, method: str, url: str, params: Optional[Dict[str, str]] = None,
1850
- data: Optional[Union[Dict[str, str], bytes]] = None) -> bytes:
1851
- """
1852
- Makes an HTTP request and returns the response content.
1853
- """
1854
- try:
1855
- resp = self.client.request(method, url, params=params, data=data, timeout=self.timeout)
1856
- except Exception as ex:
1857
- raise Exception(f"{url} {type(ex).__name__}: {ex}") from ex
1858
- if resp.status_code == 200:
1859
- return resp.content
1860
- raise Exception(f"{resp.url} returned status code {resp.status_code}. {params=} {data=}")
1861
-
1862
- def _extract_text_from_webpage(self, html_content: bytes, max_characters: Optional[int] = None) -> str:
1863
- """
1864
- Extracts visible text from HTML content using lxml parser.
1865
- """
1866
- soup = BeautifulSoup(html_content, 'lxml') # Use lxml parser
1867
- for tag in soup(["script", "style", "header", "footer", "nav"]):
1868
- tag.extract()
1869
- visible_text = soup.get_text(strip=True)
1870
- if max_characters:
1871
- visible_text = visible_text[:max_characters]
1872
- return visible_text
1873
-
1874
- def search(
1875
- self,
1876
- query: str,
1877
- region: str = "us-en",
1878
- language: str = "en",
1879
- safe: str = "off",
1880
- time_period: Optional[str] = None,
1881
- max_results: int = 10,
1882
- extract_text: bool = False,
1883
- max_text_length: Optional[int] = 100,
1884
- ) -> List[Dict[str, Union[str, int]]]:
1885
- """
1886
- Performs a Google search and returns the results.
1887
-
1888
- Args:
1889
- query (str): The search query.
1890
- region (str, optional): The region to search in (e.g., "us-en"). Defaults to "us-en".
1891
- language (str, optional): The language of the search results (e.g., "en"). Defaults to "en".
1892
- safe (str, optional): Safe search setting ("off", "active"). Defaults to "off".
1893
- time_period (Optional[str], optional): Time period filter (e.g., "h" for past hour, "d" for past day).
1894
- Defaults to None.
1895
- max_results (int, optional): The maximum number of results to retrieve. Defaults to 10.
1896
- extract_text (bool, optional): Whether to extract text from the linked web pages. Defaults to False.
1897
- max_text_length (Optional[int], optional): The maximum length of the extracted text (in characters).
1898
- Defaults to 100.
1899
-
1900
- Returns:
1901
- List[Dict[str, Union[str, int]]]: A list of dictionaries, each representing a search result, containing:
1902
- - 'title': The title of the result.
1903
- - 'href': The URL of the result.
1904
- - 'abstract': The description snippet of the result.
1905
- - 'index': The index of the result in the list.
1906
- - 'type': The type of result (currently always "web").
1907
- - 'visible_text': The extracted text from the web page (if `extract_text` is True).
1908
- """
1909
- assert query, "Query cannot be empty."
1910
-
1911
- results = []
1912
- futures = []
1913
- start = 0
1914
-
1915
- while len(results) < max_results:
1916
- params = {
1917
- "q": query,
1918
- "num": 10,
1919
- "hl": language,
1920
- "start": start,
1921
- "safe": safe,
1922
- "gl": region,
1923
- }
1924
- if time_period:
1925
- params["tbs"] = f"qdr:{time_period}"
1926
-
1927
- futures.append(self._executor.submit(self._get_url, "GET", "https://www.google.com/search", params=params))
1928
- start += 10
1929
-
1930
- for future in as_completed(futures):
1931
- try:
1932
- resp_content = future.result()
1933
- soup = BeautifulSoup(resp_content, 'lxml') # Use lxml parser
1934
- result_blocks = soup.find_all("div", class_="g")
1935
-
1936
- if not result_blocks:
1937
- break
1938
-
1939
- # Extract links and titles first
1940
- for result_block in result_blocks:
1941
- link = result_block.find("a", href=True)
1942
- title = result_block.find("h3")
1943
- description_box = result_block.find(
1944
- "div", {"style": "-webkit-line-clamp:2"}
1945
- )
1946
-
1947
- if link and title and description_box:
1948
- url = link["href"]
1949
- results.append({
1950
- "title": title.text,
1951
- "href": url,
1952
- "abstract": description_box.text,
1953
- "index": len(results),
1954
- "type": "web",
1955
- "visible_text": "" # Initialize visible_text as empty string
1956
- })
1957
-
1958
- if len(results) >= max_results:
1959
- break # Stop if we have enough results
1960
-
1961
- # Parallelize text extraction if needed
1962
- if extract_text:
1963
- with ThreadPoolExecutor(max_workers=self._executor._max_workers) as text_extractor:
1964
- extraction_futures = [
1965
- text_extractor.submit(self._extract_text_from_webpage,
1966
- self._get_url("GET", result['href']),
1967
- max_characters=max_text_length)
1968
- for result in results
1969
- if 'href' in result
1970
- ]
1971
- for i, future in enumerate(as_completed(extraction_futures)):
1972
- try:
1973
- results[i]['visible_text'] = future.result()
1974
- except Exception as e:
1975
- print(f"Error extracting text: {e}")
1976
-
1977
- except Exception as e:
1978
- print(f"Error: {e}")
1979
-
1980
- return results