Spaces:
Sleeping
Sleeping
import cloudscraper | |
from concurrent.futures import ThreadPoolExecutor | |
import os | |
import asyncio | |
from .asyncioPoliciesFix import decorator_asyncio_fix | |
from constants.headers import HEADER_AIO | |
HTTP_PROXY = os.environ.get("HTTP_PROXY", None) | |
class Scraper: | |
async def _get_html(self, session, url): | |
try: | |
async with session.get(url, headers=HEADER_AIO, proxy=HTTP_PROXY) as r: | |
return await r.text() | |
except: | |
return None | |
async def get_all_results(self, session, url): | |
return await asyncio.gather(asyncio.create_task(self._get_html(session, url))) | |
class CloudScraper: | |
def __init__(self): | |
self.scraper = cloudscraper.create_scraper() | |
def _get_html(self, url): | |
try: | |
response = self.scraper.get(url, headers=HEADER_AIO, proxies={'http': HTTP_PROXY, 'https': HTTP_PROXY}) | |
response.raise_for_status() # Raise an error for bad responses | |
return response.text | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
async def get_all_results(self, url): | |
loop = asyncio.get_event_loop() | |
with ThreadPoolExecutor() as pool: | |
return await loop.run_in_executor(pool, self._get_html, url) | |