File size: 5,637 Bytes
e4e3c00 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import asyncio
import re
import time
import aiohttp
from bs4 import BeautifulSoup
from helper.asyncioPoliciesFix import decorator_asyncio_fix
from helper.html_scraper import Scraper
from constants.base_url import KICKASS
from constants.headers import HEADER_AIO
class Kickass:
def __init__(self):
self.BASE_URL = KICKASS
self.LIMIT = None
@decorator_asyncio_fix
async def _individual_scrap(self, session, url, obj):
try:
async with session.get(url, headers=HEADER_AIO) as res:
html = await res.text(encoding="ISO-8859-1")
soup = BeautifulSoup(html, "html.parser")
try:
poster = soup.find("a", class_="movieCover")
if poster:
poster = poster.find("img")["src"]
obj["poster"] = self.BASE_URL + poster
imgs = (soup.find("div", class_="data")).find_all("img")
if imgs and len(imgs) > 0:
obj["screenshot"] = [img["src"] for img in imgs]
magnet_and_torrent = soup.find_all("a", class_="kaGiantButton")
magnet = magnet_and_torrent[0]["href"]
obj["hash"] = re.search(
r"([{a-f\d,A-F\d}]{32,40})\b", magnet
).group(0)
obj["magnet"] = magnet
except:
...
except:
return None
async def _get_torrent(self, result, session, urls):
tasks = []
for idx, url in enumerate(urls):
for obj in result["data"]:
if obj["url"] == url:
task = asyncio.create_task(
self._individual_scrap(session, url, result["data"][idx])
)
tasks.append(task)
await asyncio.gather(*tasks)
return result
def _parser(self, htmls):
try:
for html in htmls:
soup = BeautifulSoup(html, "html.parser")
list_of_urls = []
my_dict = {"data": []}
for tr in soup.select("tr.odd,tr.even"):
td = tr.find_all("td")
name = tr.find("a", class_="cellMainLink").text.strip()
url = self.BASE_URL + tr.find("a", class_="cellMainLink")["href"]
list_of_urls.append(url)
if name:
size = td[1].text.strip()
seeders = td[4].text.strip()
leechers = td[5].text.strip()
uploader = td[2].text.strip()
date = td[3].text.strip()
my_dict["data"].append(
{
"name": name,
"size": size,
"date": date,
"seeders": seeders,
"leechers": leechers,
"url": url,
"uploader": uploader,
}
)
if len(my_dict["data"]) == self.LIMIT:
break
try:
pages = soup.find("div", class_="pages")
current_page = int(pages.find("a", class_="active").text)
pages = pages.find_all("a")
total_page = pages[-1].text
if total_page == ">>":
total_page = pages[-2].text
my_dict["current_page"] = current_page
my_dict["total_pages"] = int(total_page)
except:
...
return my_dict, list_of_urls
except:
return None, None
async def search(self, query, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
url = self.BASE_URL + "/usearch/{}/{}/".format(query, page)
return await self.parser_result(start_time, url, session)
async def parser_result(self, start_time, url, session):
htmls = await Scraper().get_all_results(session, url)
result, urls = self._parser(htmls)
if result is not None:
results = await self._get_torrent(result, session, urls)
results["time"] = time.time() - start_time
results["total"] = len(results["data"])
return results
return result
async def trending(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/top-100"
else:
if category == "tv":
category == "television"
elif category == "apps":
category = "applications"
url = self.BASE_URL + "/top-100-{}/".format(category)
return await self.parser_result(start_time, url, session)
async def recent(self, category, page, limit):
async with aiohttp.ClientSession() as session:
start_time = time.time()
self.LIMIT = limit
if not category:
url = self.BASE_URL + "/new/"
else:
url = self.BASE_URL + "/{}/".format(category)
return await self.parser_result(start_time, url, session) |