Meta-AI / torrents /x1337.py
randydev's picture
Create x1337.py
46ff3d8 verified
import asyncio
import re
import time
import aiohttp
from bs4 import BeautifulSoup
from helper.asyncioPoliciesFix import decorator_asyncio_fix
from helper.html_scraper import Scraper
from constants.base_url import X1337
from constants.headers import HEADER_AIO
class x1337:
def __init__(self):
self.BASE_URL = X1337
self.LIMIT = None
@decorator_asyncio_fix
async def _individual_scrap(self, session, url, obj):
try:
async with session.get(url, headers=HEADER_AIO) as res:
html = await res.text(encoding="ISO-8859-1")
soup = BeautifulSoup(html, "html.parser")
try:
magnet = soup.select_one(".no-top-radius > div > ul > li > a")[
"href"
]
uls = soup.find_all("ul", class_="list")[1]
lis = uls.find_all("li")[0]
imgs = [
img["data-original"]
for img in (soup.find("div", id="description")).find_all("img")
if img["data-original"].endswith((".png", ".jpg", ".jpeg"))
]
files = [
f.text for f in soup.find("div", id="files").find_all("li")
]
if len(imgs) > 0:
obj["screenshot"] = imgs
obj["category"] = lis.find("span").text
obj["files"] = files
try:
poster = soup.select_one("div.torrent-image img")["src"]
if str(poster).startswith("//"):
obj["poster"] = "https:" + poster
elif str(poster).startswith("/"):
obj["poster"] = self.BASE_URL + poster
except:
...
obj["magnet"] = magnet
obj["hash"] = re.search(
r"([{a-f\d,A-F\d}]{32,40})\b", magnet
).group(0)
except IndexError:
...
except:
return None
async def _get_torrent(self, result, session, urls):
tasks = []
for idx, url in enumerate(urls):
for obj in result["data"]:
if obj["url"] == url:
task = asyncio.create_task(
self._individual_scrap(session, url, result["data"][idx])
)
tasks.append(task)
await asyncio.gather(*tasks)
return result
def _parser(self, htmls):
try:
for html in htmls:
soup = BeautifulSoup(html, "html.parser")
list_of_urls = []
my_dict = {"data": []}
trs = soup.select("tbody tr")
for tr in trs:
td = tr.find_all("td")
name = td[0].find_all("a")[-1].text
if name:
url = self.BASE_URL + td[0].find_all("a")[-1]["href"]
list_of_urls.append(url)
seeders = td[1].text
leechers = td[2].text
date = td[3].text
size = td[4].text.replace(seeders, "")
uploader = td[5].find("a").text
my_dict["data"].append(
{
"name": name,
"size": size,
"date": date,
"seeders": seeders,
"leechers": leechers,
"url": url,
"uploader": uploader,
}
)
if len(my_dict["data"]) == self.LIMIT:
break
try:
pages = soup.select(".pagination li a")
my_dict["current_page"] = int(pages[0].text)
tpages = pages[-1].text
if tpages == ">>":
my_dict["total_pages"] = int(pages[-2].text)
else:
my_dict["total_pages"] = int(pages[-1].text)
except:
...
return my_dict, list_of_urls
except:
return None, None
async def search(self, query, page, limit):
async with aiohttp.ClientSession() as session:
self.LIMIT = limit
start_time = time.time()
url = self.BASE_URL + "/search/{}/{}/".format(query, page)
return await self.parser_result(
start_time, url, session, query=query, page=page
)
async def parser_result(self, start_time, url, session, page, query=None):
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls)
if result is not None:
results = await self._get_torrent(result, session, urls)
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
if query is None:
return results
while True:
if len(results["data"]) >= self.LIMIT:
results["data"] = results["data"][0 : self.LIMIT]
results["total"] = len(results["data"])
return results
page = page + 1
url = self.BASE_URL + "/search/{}/{}/".format(query, page)
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls)
if result is not None:
if len(result["data"]) > 0:
res = await self._get_torrent(result, session, urls)
for obj in res["data"]:
results["data"].append(obj)
try:
results["current_page"] = res["current_page"]
except:
...
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
else:
break
else:
break
return results
return result
async def trending(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/home/"
else:
url = self.BASE_URL + "/popular-{}".format(category.lower())
return await self.parser_result(start_time, url, session, page)
async def recent(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/trending"
else:
url = self.BASE_URL + "/cat/{}/{}/".format(
str(category).capitalize(), page
)
return await self.parser_result(start_time, url, session, page)
async def search_by_category(self, query, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
url = self.BASE_URL + "/category-search/{}/{}/{}/".format(
query, category.capitalize(), page
)
return await self.parser_result(start_time, url, session, page, query)