File size: 7,849 Bytes
46ff3d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import asyncio
import re
import time
import aiohttp
from bs4 import BeautifulSoup
from helper.asyncioPoliciesFix import decorator_asyncio_fix
from helper.html_scraper import Scraper
from constants.base_url import X1337
from constants.headers import HEADER_AIO
class x1337:
def __init__(self):
self.BASE_URL = X1337
self.LIMIT = None
@decorator_asyncio_fix
async def _individual_scrap(self, session, url, obj):
try:
async with session.get(url, headers=HEADER_AIO) as res:
html = await res.text(encoding="ISO-8859-1")
soup = BeautifulSoup(html, "html.parser")
try:
magnet = soup.select_one(".no-top-radius > div > ul > li > a")[
"href"
]
uls = soup.find_all("ul", class_="list")[1]
lis = uls.find_all("li")[0]
imgs = [
img["data-original"]
for img in (soup.find("div", id="description")).find_all("img")
if img["data-original"].endswith((".png", ".jpg", ".jpeg"))
]
files = [
f.text for f in soup.find("div", id="files").find_all("li")
]
if len(imgs) > 0:
obj["screenshot"] = imgs
obj["category"] = lis.find("span").text
obj["files"] = files
try:
poster = soup.select_one("div.torrent-image img")["src"]
if str(poster).startswith("//"):
obj["poster"] = "https:" + poster
elif str(poster).startswith("/"):
obj["poster"] = self.BASE_URL + poster
except:
...
obj["magnet"] = magnet
obj["hash"] = re.search(
r"([{a-f\d,A-F\d}]{32,40})\b", magnet
).group(0)
except IndexError:
...
except:
return None
async def _get_torrent(self, result, session, urls):
tasks = []
for idx, url in enumerate(urls):
for obj in result["data"]:
if obj["url"] == url:
task = asyncio.create_task(
self._individual_scrap(session, url, result["data"][idx])
)
tasks.append(task)
await asyncio.gather(*tasks)
return result
def _parser(self, htmls):
try:
for html in htmls:
soup = BeautifulSoup(html, "html.parser")
list_of_urls = []
my_dict = {"data": []}
trs = soup.select("tbody tr")
for tr in trs:
td = tr.find_all("td")
name = td[0].find_all("a")[-1].text
if name:
url = self.BASE_URL + td[0].find_all("a")[-1]["href"]
list_of_urls.append(url)
seeders = td[1].text
leechers = td[2].text
date = td[3].text
size = td[4].text.replace(seeders, "")
uploader = td[5].find("a").text
my_dict["data"].append(
{
"name": name,
"size": size,
"date": date,
"seeders": seeders,
"leechers": leechers,
"url": url,
"uploader": uploader,
}
)
if len(my_dict["data"]) == self.LIMIT:
break
try:
pages = soup.select(".pagination li a")
my_dict["current_page"] = int(pages[0].text)
tpages = pages[-1].text
if tpages == ">>":
my_dict["total_pages"] = int(pages[-2].text)
else:
my_dict["total_pages"] = int(pages[-1].text)
except:
...
return my_dict, list_of_urls
except:
return None, None
async def search(self, query, page, limit):
async with aiohttp.ClientSession() as session:
self.LIMIT = limit
start_time = time.time()
url = self.BASE_URL + "/search/{}/{}/".format(query, page)
return await self.parser_result(
start_time, url, session, query=query, page=page
)
async def parser_result(self, start_time, url, session, page, query=None):
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls)
if result is not None:
results = await self._get_torrent(result, session, urls)
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
if query is None:
return results
while True:
if len(results["data"]) >= self.LIMIT:
results["data"] = results["data"][0 : self.LIMIT]
results["total"] = len(results["data"])
return results
page = page + 1
url = self.BASE_URL + "/search/{}/{}/".format(query, page)
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls)
if result is not None:
if len(result["data"]) > 0:
res = await self._get_torrent(result, session, urls)
for obj in res["data"]:
results["data"].append(obj)
try:
results["current_page"] = res["current_page"]
except:
...
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
else:
break
else:
break
return results
return result
async def trending(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/home/"
else:
url = self.BASE_URL + "/popular-{}".format(category.lower())
return await self.parser_result(start_time, url, session, page)
async def recent(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/trending"
else:
url = self.BASE_URL + "/cat/{}/{}/".format(
str(category).capitalize(), page
)
return await self.parser_result(start_time, url, session, page)
async def search_by_category(self, query, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
url = self.BASE_URL + "/category-search/{}/{}/{}/".format(
query, category.capitalize(), page
)
return await self.parser_result(start_time, url, session, page, query) |