Tor-Search-Api / torrents /limetorrents.py
SoulofSukuna's picture
Upload 44 files
e9d67ff verified
import asyncio
import re
import time
import aiohttp
from bs4 import BeautifulSoup
from helper.asyncioPoliciesFix import decorator_asyncio_fix
from helper.html_scraper import Scraper
from constants.base_url import LIMETORRENT
from constants.headers import HEADER_AIO
class Limetorrent:
def __init__(self):
self.BASE_URL = LIMETORRENT
self.LIMIT = None
@decorator_asyncio_fix
async def _individual_scrap(self, session, url, obj):
try:
async with session.get(url, headers=HEADER_AIO) as res:
html = await res.text(encoding="ISO-8859-1")
soup = BeautifulSoup(html, "html.parser")
try:
a_tag = soup.find_all("a", class_="csprite_dltorrent")
obj["torrent"] = a_tag[0]["href"]
obj["magnet"] = a_tag[-1]["href"]
obj["hash"] = re.search(
r"([{a-f\d,A-F\d}]{32,40})\b", obj["magnet"]
).group(0)
except:
...
except:
return None
async def _get_torrent(self, result, session, urls):
tasks = []
for idx, url in enumerate(urls):
for obj in result["data"]:
if obj["url"] == url:
task = asyncio.create_task(
self._individual_scrap(session, url, result["data"][idx])
)
tasks.append(task)
await asyncio.gather(*tasks)
return result
def _parser(self, htmls, idx=0):
try:
for html in htmls:
soup = BeautifulSoup(html, "html.parser")
list_of_urls = []
my_dict = {"data": []}
for tr in soup.find_all("tr")[idx:]:
td = tr.find_all("td")
if len(td) == 0:
continue
name = td[0].get_text(strip=True)
url = self.BASE_URL + td[0].find_all("a")[-1]["href"]
list_of_urls.append(url)
added_on_and_category = td[1].get_text(strip=True)
date = (added_on_and_category.split("-")[0]).strip()
category = (added_on_and_category.split("in")[-1]).strip()
size = td[2].text
seeders = td[3].text
leechers = td[4].text
my_dict["data"].append(
{
"name": name,
"size": size,
"date": date,
"category": category if category != date else None,
"seeders": seeders,
"leechers": leechers,
"url": url,
}
)
if len(my_dict["data"]) == self.LIMIT:
break
try:
div = soup.find("div", class_="search_stat")
current_page = int(div.find("span", class_="active").text)
total_page = int((div.find_all("a"))[-2].text)
if current_page > total_page:
total_page = current_page
my_dict["current_page"] = current_page
my_dict["total_pages"] = total_page
except:
...
return my_dict, list_of_urls
except:
return None, None
async def search(self, query, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
url = self.BASE_URL + "/search/all/{}//{}".format(query, page)
return await self.parser_result(start_time, url, session, idx=5)
async def parser_result(self, start_time, url, session, idx=0):
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls, idx)
if result is not None:
results = await self._get_torrent(result, session, urls)
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
return results
return result
async def trending(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
url = self.BASE_URL + "/top100"
return await self.parser_result(start_time, url, session)
async def recent(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/latest100"
else:
category = (category).capitalize()
if category == "Apps":
category = "Applications"
elif category == "Tv":
category = "TV-shows"
url = self.BASE_URL + "/browse-torrents/{}/date/{}/".format(
category, page
)
return await self.parser_result(start_time, url, session)