import asyncio import re import time import aiohttp from bs4 import BeautifulSoup from helper.asyncioPoliciesFix import decorator_asyncio_fix from helper.html_scraper import Scraper from constants.base_url import X1337 from constants.headers import HEADER_AIO class x1337: def __init__(self): self.BASE_URL = X1337 self.LIMIT = None @decorator_asyncio_fix async def _individual_scrap(self, session, url, obj): try: async with session.get(url, headers=HEADER_AIO) as res: html = await res.text(encoding="ISO-8859-1") soup = BeautifulSoup(html, "html.parser") try: magnet = soup.select_one(".no-top-radius > div > ul > li > a")[ "href" ] uls = soup.find_all("ul", class_="list")[1] lis = uls.find_all("li")[0] imgs = [ img["data-original"] for img in (soup.find("div", id="description")).find_all("img") if img["data-original"].endswith((".png", ".jpg", ".jpeg")) ] files = [ f.text for f in soup.find("div", id="files").find_all("li") ] if len(imgs) > 0: obj["screenshot"] = imgs obj["category"] = lis.find("span").text obj["files"] = files try: poster = soup.select_one("div.torrent-image img")["src"] if str(poster).startswith("//"): obj["poster"] = "https:" + poster elif str(poster).startswith("/"): obj["poster"] = self.BASE_URL + poster except: ... obj["magnet"] = magnet obj["hash"] = re.search( r"([{a-f\d,A-F\d}]{32,40})\b", magnet ).group(0) except IndexError: ... except: return None async def _get_torrent(self, result, session, urls): tasks = [] for idx, url in enumerate(urls): for obj in result["data"]: if obj["url"] == url: task = asyncio.create_task( self._individual_scrap(session, url, result["data"][idx]) ) tasks.append(task) await asyncio.gather(*tasks) return result def _parser(self, htmls): try: for html in htmls: soup = BeautifulSoup(html, "html.parser") list_of_urls = [] my_dict = {"data": []} trs = soup.select("tbody tr") for tr in trs: td = tr.find_all("td") name = td[0].find_all("a")[-1].text if name: url = self.BASE_URL + td[0].find_all("a")[-1]["href"] list_of_urls.append(url) seeders = td[1].text leechers = td[2].text date = td[3].text size = td[4].text.replace(seeders, "") uploader = td[5].find("a").text my_dict["data"].append( { "name": name, "size": size, "date": date, "seeders": seeders, "leechers": leechers, "url": url, "uploader": uploader, } ) if len(my_dict["data"]) == self.LIMIT: break try: pages = soup.select(".pagination li a") my_dict["current_page"] = int(pages[0].text) tpages = pages[-1].text if tpages == ">>": my_dict["total_pages"] = int(pages[-2].text) else: my_dict["total_pages"] = int(pages[-1].text) except: ... return my_dict, list_of_urls except: return None, None async def search(self, query, page, limit): async with aiohttp.ClientSession() as session: self.LIMIT = limit start_time = time.time() url = self.BASE_URL + "/search/{}/{}/".format(query, page) return await self.parser_result( start_time, url, session, query=query, page=page ) async def parser_result(self, start_time, url, session, page, query=None): htmls = await Scraper().get_all_results(session, url) result, urls = self._parser(htmls) if result is not None: results = await self._get_torrent(result, session, urls) results["time"] = time.time() - start_time results["total"] = len(results["data"]) if query is None: return results while True: if len(results["data"]) >= self.LIMIT: results["data"] = results["data"][0 : self.LIMIT] results["total"] = len(results["data"]) return results page = page + 1 url = self.BASE_URL + "/search/{}/{}/".format(query, page) htmls = await Scraper().get_all_results(session, url) result, urls = self._parser(htmls) if result is not None: if len(result["data"]) > 0: res = await self._get_torrent(result, session, urls) for obj in res["data"]: results["data"].append(obj) try: results["current_page"] = res["current_page"] except: ... results["time"] = time.time() - start_time results["total"] = len(results["data"]) else: break else: break return results return result async def trending(self, category, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit if not category: url = self.BASE_URL + "/home/" else: url = self.BASE_URL + "/popular-{}".format(category.lower()) return await self.parser_result(start_time, url, session, page) async def recent(self, category, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit if not category: url = self.BASE_URL + "/trending" else: url = self.BASE_URL + "/cat/{}/{}/".format( str(category).capitalize(), page ) return await self.parser_result(start_time, url, session, page) async def search_by_category(self, query, category, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit url = self.BASE_URL + "/category-search/{}/{}/{}/".format( query, category.capitalize(), page ) return await self.parser_result(start_time, url, session, page, query)