import re import time import aiohttp from bs4 import BeautifulSoup from helper.html_scraper import Scraper from constants.base_url import TGX class TorrentGalaxy: def __init__(self): self.BASE_URL = TGX self.LIMIT = None def _parser_individual(self, html): try: soup = BeautifulSoup(html[0], "html.parser") my_dict = {"data": []} root_div = soup.find("div", class_="gluewrapper") post_nd_torrents = root_div.find_next("div").find_all("div") poster = post_nd_torrents[1].find("img")["data-src"] torrentsand_all = post_nd_torrents[4].find_all("a") torrent_link = torrentsand_all[0]["href"] magnet_link = torrentsand_all[1]["href"] direct_link = self.BASE_URL + torrentsand_all[2]["href"] details_root = soup.find("div", class_="gluewrapper").select( "div > :nth-child(2) > div > .tprow" ) name = details_root[0].find_all("div")[-1].get_text(strip=True) category = ( details_root[3].find_all("div")[-1].get_text(strip=True).split(">")[0] ) languagee = details_root[4].find_all("div")[-1].get_text(strip=True) size = details_root[5].find_all("div")[-1].get_text(strip=True) hash = details_root[6].find_all("div")[-1].get_text(strip=True) username = ( details_root[7] .find_all("div")[-1] .find("span", class_="username") .get_text(strip=True) ) date_up = details_root[8].find_all("div")[-1].get_text(strip=True) btns = details_root[10].find_all("button") seeders = btns[0].find("span").get_text(strip=True) leechers = btns[1].find("span").get_text(strip=True) downloads = btns[2].find("span").get_text(strip=True) imdb_id = soup.select_one("#imdbpage")["href"].split("/")[-1] genre_list = [ x.get_text(strip=True) for x in details_root[11].find_all("a") ] soup.find("div", id="intblockslide").find_all("a") imgs = [ img["href"] for img in (soup.find("div", id="intblockslide").find_all("a")) if img["href"].endswith((".png", ".jpg", ".jpeg")) ] my_dict["data"].append( { "name": name, "size": size, "seeders": seeders, "language": languagee, "leechers": leechers, "category": category, "uploader": username, "downloads": downloads, "poster": poster, "direct_download_link": direct_link, "imdb_id": imdb_id, "hash": hash, "magnet": magnet_link, "torrent": torrent_link, "screenshot": imgs, "genre": genre_list, "date": date_up, } ) return my_dict except: return None def _parser(self, htmls): try: for html in htmls: soup = BeautifulSoup(html, "html.parser") my_dict = {"data": []} for idx, divs in enumerate(soup.find_all("div", class_="tgxtablerow")): div = divs.find_all("div") try: name = div[4].find("a").get_text(strip=True) imdb_url = (div[4].find_all("a"))[-1]["href"] except: name = (div[1].find("a", class_="txlight")).find("b").text imdb_url = (div[1].find_all("a"))[-1]["href"] if name != "": try: magnet = div[5].find_all("a")[1]["href"] torrent = div[5].find_all("a")[0]["href"] except: magnet = div[3].find_all("a")[1]["href"] torrent = div[3].find_all("a")[0]["href"] size = soup.select("span.badge.badge-secondary.txlight")[ idx ].text try: url = div[4].find("a")["href"] except: url = div[1].find("a", class_="txlight")["href"] try: date = div[12].get_text(strip=True) except: date = div[10].get_text(strip=True) try: seeders_leechers = div[11].find_all("b") seeders = seeders_leechers[0].text leechers = seeders_leechers[1].text except: seeders_leechers = div[11].find_all("b") seeders = seeders_leechers[0].text leechers = seeders_leechers[1].text try: uploader = (div[7].find("a")).find("span").text except: uploader = (div[5].find("a")).find("span").text try: category = ( div[0].find("small").text.replace(" ", "") ).split(":")[0] except: category = None my_dict["data"].append( { "name": name, "size": size, "seeders": seeders, "leechers": leechers, "category": category, "uploader": uploader, "imdb_id": imdb_url.split("=")[-1], "hash": re.search( r"([{a-f\d,A-F\d}]{32,40})\b", magnet ).group(0), "magnet": magnet, "torrent": torrent, "url": self.BASE_URL + url, "date": date, } ) if len(my_dict["data"]) == self.LIMIT: break try: ul = soup.find_all("ul", class_="pagination")[-1] tpages = ul.find_all("li")[-2] my_dict["current_page"] = int( soup.select_one("li.page-item.active.txlight a").text.split( " " )[0] ) my_dict["total_pages"] = int(tpages.find("a").text) except: my_dict["current_page"] = None my_dict["total_pages"] = None # ... return my_dict except: return None async def search(self, query, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit url = ( self.BASE_URL + "/torrents.php?search=+{}&sort=seeders&order=desc&page={}".format( query, page - 1 ) ) return await self.parser_result(start_time, url, session) async def get_torrent_by_url(self, torrent_url): async with aiohttp.ClientSession() as session: start_time = time.time() return await self.parser_result( start_time, torrent_url, session, is_individual=True ) async def parser_result(self, start_time, url, session, is_individual=False): html = await Scraper().get_all_results(session, url) if is_individual: results = self._parser_individual(html) else: results = self._parser(html) if results is not None: results["time"] = time.time() - start_time results["total"] = len(results["data"]) return results return results async def trending(self, category, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit url = self.BASE_URL return await self.parser_result(start_time, url, session) async def recent(self, category, page, limit): async with aiohttp.ClientSession() as session: start_time = time.time() self.LIMIT = limit if not category: url = self.BASE_URL + "/latest" else: if category == "documentaries": category = "Docus" url = ( self.BASE_URL + "/torrents.php?parent_cat={}&sort=id&order=desc&page={}".format( str(category).capitalize(), page - 1 ) ) return await self.parser_result(start_time, url, session) #! Maybe Implemented in Future