|
import re |
|
import time |
|
import aiohttp |
|
from bs4 import BeautifulSoup |
|
from helper.html_scraper import Scraper |
|
from constants.base_url import TGX |
|
|
|
|
|
class TorrentGalaxy: |
|
def __init__(self): |
|
self.BASE_URL = TGX |
|
self.LIMIT = None |
|
|
|
def _parser_individual(self, html): |
|
try: |
|
soup = BeautifulSoup(html[0], "html.parser") |
|
my_dict = {"data": []} |
|
root_div = soup.find("div", class_="gluewrapper") |
|
post_nd_torrents = root_div.find_next("div").find_all("div") |
|
poster = post_nd_torrents[1].find("img")["data-src"] |
|
torrentsand_all = post_nd_torrents[4].find_all("a") |
|
torrent_link = torrentsand_all[0]["href"] |
|
magnet_link = torrentsand_all[1]["href"] |
|
direct_link = self.BASE_URL + torrentsand_all[2]["href"] |
|
|
|
details_root = soup.find("div", class_="gluewrapper").select( |
|
"div > :nth-child(2) > div > .tprow" |
|
) |
|
|
|
name = details_root[0].find_all("div")[-1].get_text(strip=True) |
|
category = ( |
|
details_root[3].find_all("div")[-1].get_text(strip=True).split(">")[0] |
|
) |
|
languagee = details_root[4].find_all("div")[-1].get_text(strip=True) |
|
size = details_root[5].find_all("div")[-1].get_text(strip=True) |
|
hash = details_root[6].find_all("div")[-1].get_text(strip=True) |
|
username = ( |
|
details_root[7] |
|
.find_all("div")[-1] |
|
.find("span", class_="username") |
|
.get_text(strip=True) |
|
) |
|
date_up = details_root[8].find_all("div")[-1].get_text(strip=True) |
|
|
|
btns = details_root[10].find_all("button") |
|
seeders = btns[0].find("span").get_text(strip=True) |
|
leechers = btns[1].find("span").get_text(strip=True) |
|
downloads = btns[2].find("span").get_text(strip=True) |
|
imdb_id = soup.select_one("#imdbpage")["href"].split("/")[-1] |
|
genre_list = [ |
|
x.get_text(strip=True) for x in details_root[11].find_all("a") |
|
] |
|
soup.find("div", id="intblockslide").find_all("a") |
|
imgs = [ |
|
img["href"] |
|
for img in (soup.find("div", id="intblockslide").find_all("a")) |
|
if img["href"].endswith((".png", ".jpg", ".jpeg")) |
|
] |
|
my_dict["data"].append( |
|
{ |
|
"name": name, |
|
"size": size, |
|
"seeders": seeders, |
|
"language": languagee, |
|
"leechers": leechers, |
|
"category": category, |
|
"uploader": username, |
|
"downloads": downloads, |
|
"poster": poster, |
|
"direct_download_link": direct_link, |
|
"imdb_id": imdb_id, |
|
"hash": hash, |
|
"magnet": magnet_link, |
|
"torrent": torrent_link, |
|
"screenshot": imgs, |
|
"genre": genre_list, |
|
"date": date_up, |
|
} |
|
) |
|
return my_dict |
|
except: |
|
return None |
|
|
|
def _parser(self, htmls): |
|
try: |
|
for html in htmls: |
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
my_dict = {"data": []} |
|
for idx, divs in enumerate(soup.find_all("div", class_="tgxtablerow")): |
|
div = divs.find_all("div") |
|
try: |
|
name = div[4].find("a").get_text(strip=True) |
|
imdb_url = (div[4].find_all("a"))[-1]["href"] |
|
except: |
|
name = (div[1].find("a", class_="txlight")).find("b").text |
|
imdb_url = (div[1].find_all("a"))[-1]["href"] |
|
|
|
if name != "": |
|
try: |
|
magnet = div[5].find_all("a")[1]["href"] |
|
torrent = div[5].find_all("a")[0]["href"] |
|
except: |
|
magnet = div[3].find_all("a")[1]["href"] |
|
torrent = div[3].find_all("a")[0]["href"] |
|
size = soup.select("span.badge.badge-secondary.txlight")[ |
|
idx |
|
].text |
|
try: |
|
url = div[4].find("a")["href"] |
|
except: |
|
url = div[1].find("a", class_="txlight")["href"] |
|
try: |
|
date = div[12].get_text(strip=True) |
|
except: |
|
date = div[10].get_text(strip=True) |
|
try: |
|
seeders_leechers = div[11].find_all("b") |
|
seeders = seeders_leechers[0].text |
|
leechers = seeders_leechers[1].text |
|
except: |
|
seeders_leechers = div[11].find_all("b") |
|
seeders = seeders_leechers[0].text |
|
leechers = seeders_leechers[1].text |
|
try: |
|
uploader = (div[7].find("a")).find("span").text |
|
except: |
|
uploader = (div[5].find("a")).find("span").text |
|
try: |
|
category = ( |
|
div[0].find("small").text.replace(" ", "") |
|
).split(":")[0] |
|
except: |
|
category = None |
|
my_dict["data"].append( |
|
{ |
|
"name": name, |
|
"size": size, |
|
"seeders": seeders, |
|
"leechers": leechers, |
|
"category": category, |
|
"uploader": uploader, |
|
"imdb_id": imdb_url.split("=")[-1], |
|
"hash": re.search( |
|
r"([{a-f\d,A-F\d}]{32,40})\b", magnet |
|
).group(0), |
|
"magnet": magnet, |
|
"torrent": torrent, |
|
"url": self.BASE_URL + url, |
|
"date": date, |
|
} |
|
) |
|
if len(my_dict["data"]) == self.LIMIT: |
|
break |
|
try: |
|
ul = soup.find_all("ul", class_="pagination")[-1] |
|
tpages = ul.find_all("li")[-2] |
|
my_dict["current_page"] = int( |
|
soup.select_one("li.page-item.active.txlight a").text.split( |
|
" " |
|
)[0] |
|
) |
|
my_dict["total_pages"] = int(tpages.find("a").text) |
|
except: |
|
my_dict["current_page"] = None |
|
my_dict["total_pages"] = None |
|
|
|
return my_dict |
|
except: |
|
return None |
|
|
|
async def search(self, query, page, limit): |
|
async with aiohttp.ClientSession() as session: |
|
start_time = time.time() |
|
self.LIMIT = limit |
|
url = ( |
|
self.BASE_URL |
|
+ "/torrents.php?search=+{}&sort=seeders&order=desc&page={}".format( |
|
query, page - 1 |
|
) |
|
) |
|
return await self.parser_result(start_time, url, session) |
|
|
|
async def get_torrent_by_url(self, torrent_url): |
|
async with aiohttp.ClientSession() as session: |
|
start_time = time.time() |
|
return await self.parser_result( |
|
start_time, torrent_url, session, is_individual=True |
|
) |
|
|
|
async def parser_result(self, start_time, url, session, is_individual=False): |
|
html = await Scraper().get_all_results(session, url) |
|
if is_individual: |
|
results = self._parser_individual(html) |
|
else: |
|
results = self._parser(html) |
|
if results is not None: |
|
results["time"] = time.time() - start_time |
|
results["total"] = len(results["data"]) |
|
return results |
|
return results |
|
|
|
async def trending(self, category, page, limit): |
|
async with aiohttp.ClientSession() as session: |
|
start_time = time.time() |
|
self.LIMIT = limit |
|
url = self.BASE_URL |
|
return await self.parser_result(start_time, url, session) |
|
|
|
async def recent(self, category, page, limit): |
|
async with aiohttp.ClientSession() as session: |
|
start_time = time.time() |
|
self.LIMIT = limit |
|
if not category: |
|
url = self.BASE_URL + "/latest" |
|
else: |
|
if category == "documentaries": |
|
category = "Docus" |
|
url = ( |
|
self.BASE_URL |
|
+ "/torrents.php?parent_cat={}&sort=id&order=desc&page={}".format( |
|
str(category).capitalize(), page - 1 |
|
) |
|
) |
|
return await self.parser_result(start_time, url, session) |
|
|
|
|