|
import logging, os, re, asyncio, requests, aiohttp |
|
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid |
|
from pyrogram.types import Message, InlineKeyboardButton |
|
from pyrogram import filters, enums |
|
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API |
|
from imdb import Cinemagoer |
|
from typing import Union, List |
|
from datetime import datetime, timedelta |
|
import logging |
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))") |
|
BANNED = {} |
|
SMART_OPEN = 'β' |
|
SMART_CLOSE = 'β' |
|
START_CHAR = ('\'', '"', SMART_OPEN) |
|
|
|
|
|
class temp(object): |
|
BANNED_USERS = [] |
|
BANNED_CHATS = [] |
|
CURRENT = 0 |
|
CANCEL = False |
|
MELCOW = {} |
|
U_NAME = None |
|
B_NAME = None |
|
SETTINGS = {} |
|
GP_BUTTONS = {} |
|
PM_BUTTONS = {} |
|
PM_SPELL = {} |
|
GP_SPELL = {} |
|
|
|
async def is_subscribed(bot, query): |
|
""" |
|
Check if a user is subscribed to a specific channel. |
|
""" |
|
logger.info(f"Checking if user {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.") |
|
try: |
|
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) |
|
logger.info(f"User {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.") |
|
except UserNotParticipant: |
|
logger.info(f"User {query.from_user.id} is not subscribed to channel {AUTH_CHANNEL}.") |
|
pass |
|
except Exception as e: |
|
logger.error(f"Error checking subscription for user {query.from_user.id}: {e}") |
|
print(e) |
|
else: |
|
if user.status != enums.ChatMemberStatus.BANNED: |
|
logger.info(f"User {query.from_user.id} is not banned in channel {AUTH_CHANNEL}.") |
|
return True |
|
logger.info(f"User {query.from_user.id} is not subscribed or is banned in channel {AUTH_CHANNEL}.") |
|
return False |
|
|
|
async def get_poster(query, bulk=False, id=False, file=None): |
|
""" |
|
Fetch movie details from IMDb. |
|
""" |
|
logger.info(f"Fetching poster for query: {query}, bulk: {bulk}, id: {id}, file: {file}.") |
|
imdb = Cinemagoer() |
|
if not id: |
|
query = (query.strip()).lower() |
|
title = query |
|
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) |
|
if year: |
|
year = list_to_str(year[:1]) |
|
title = (query.replace(year, "")).strip() |
|
elif file is not None: |
|
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) |
|
if year: |
|
year = list_to_str(year[:1]) |
|
else: |
|
year = None |
|
try: |
|
movieid = imdb.search_movie(title.lower(), results=10) |
|
logger.info(f"Found {len(movieid)} movie results for title: {title}.") |
|
except Exception as e: |
|
logger.error(f"Error searching movie: {e}") |
|
return None |
|
if not movieid: |
|
logger.info(f"No movie results found for title: {title}.") |
|
return None |
|
if year: |
|
filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid)) |
|
if not filtered: |
|
filtered = movieid |
|
else: |
|
filtered = movieid |
|
movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) |
|
if not movieid: |
|
movieid = filtered |
|
if bulk: |
|
logger.info(f"Returning bulk movie results: {filtered}.") |
|
return filtered |
|
movieid = movieid[0].movieID |
|
logger.info(f"Selected movie ID: {movieid}.") |
|
else: |
|
movieid = query |
|
logger.info(f"Using provided movie ID: {movieid}.") |
|
movie = imdb.get_movie(movieid) |
|
if movie.get("original air date"): |
|
date = movie["original air date"] |
|
elif movie.get("year"): |
|
date = movie.get("year") |
|
else: |
|
date = "N/A" |
|
plot = "" |
|
if not LONG_IMDB_DESCRIPTION: |
|
plot = movie.get('plot') |
|
if plot and len(plot) > 0: |
|
plot = plot[0] |
|
else: |
|
plot = movie.get('plot outline') |
|
if plot and len(plot) > 800: |
|
plot = plot[0:800] + "..." |
|
|
|
logger.info(f"Movie details fetched: {movie.get('title')}, Year: {date}, Plot: {plot}.") |
|
return { |
|
'title': movie.get('title'), |
|
'votes': movie.get('votes'), |
|
"aka": list_to_str(movie.get("akas")), |
|
"seasons": movie.get("number of seasons"), |
|
"box_office": movie.get('box office'), |
|
'localized_title': movie.get('localized title'), |
|
'kind': movie.get("kind"), |
|
"imdb_id": f"tt{movie.get('imdbID')}", |
|
"cast": list_to_str(movie.get("cast")), |
|
"runtime": list_to_str(movie.get("runtimes")), |
|
"countries": list_to_str(movie.get("countries")), |
|
"certificates": list_to_str(movie.get("certificates")), |
|
"languages": list_to_str(movie.get("languages")), |
|
"director": list_to_str(movie.get("director")), |
|
"writer": list_to_str(movie.get("writer")), |
|
"producer": list_to_str(movie.get("producer")), |
|
"composer": list_to_str(movie.get("composer")) , |
|
"cinematographer": list_to_str(movie.get("cinematographer")), |
|
"music_team": list_to_str(movie.get("music department")), |
|
"distributors": list_to_str(movie.get("distributors")), |
|
'release_date': date, |
|
'year': movie.get('year'), |
|
'genres': list_to_str(movie.get("genres")), |
|
'poster': movie.get('full-size cover url'), |
|
'plot': plot, |
|
'rating': str(movie.get("rating")), |
|
'url': f'https://www.imdb.com/title/tt{movieid}' |
|
} |
|
|
|
def list_to_str(k): |
|
""" |
|
Convert a list to a comma-separated string. |
|
""" |
|
logger.debug(f"Converting list to string: {k}") |
|
if not k: |
|
logger.debug("List is empty, returning 'N/A'.") |
|
return "N/A" |
|
elif len(k) == 1: |
|
logger.debug(f"List has one element, returning: {k[0]}") |
|
return str(k[0]) |
|
elif MAX_LIST_ELM: |
|
k = k[:int(MAX_LIST_ELM)] |
|
result = ' '.join(f'{elem}, ' for elem in k) |
|
logger.debug(f"List truncated to {MAX_LIST_ELM} elements, returning: {result}") |
|
return result |
|
else: |
|
result = ' '.join(f'{elem}, ' for elem in k) |
|
logger.debug(f"Returning full list as string: {result}") |
|
return result |
|
|
|
__repo__ = "https://github.com/MrMKN/PROFESSOR-BOT" |
|
logger.info(f"Repository URL set to: {__repo__}") |
|
|
|
__version__ = "PROFESSOR-BOT α΄ 4.5.0" |
|
logger.info(f"Version set to: {__version__}") |
|
|
|
__license__ = "GNU GENERAL PUBLIC LICENSE V2" |
|
logger.info(f"License set to: {__license__}") |
|
|
|
__copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>" |
|
logger.info(f"Copyright set to: {__copyright__}") |
|
|
|
async def search_gagala(text): |
|
""" |
|
Search Google for a given text. |
|
""" |
|
logger.info(f"Searching Google for text: {text}") |
|
usr_agent = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' |
|
'Chrome/61.0.3163.100 Safari/537.36' |
|
} |
|
text = text.replace(" ", '+') |
|
url = f'https://www.google.com/search?q={text}' |
|
try: |
|
response = requests.get(url, headers=usr_agent) |
|
response.raise_for_status() |
|
logger.info(f"Google search successful for text: {text}") |
|
except Exception as e: |
|
logger.error(f"Error searching Google: {e}") |
|
return [] |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
titles = soup.find_all('h3') |
|
result = [title.getText() for title in titles] |
|
logger.info(f"Found {len(result)} Google search results.") |
|
return result |
|
|
|
async def get_settings(group_id): |
|
""" |
|
Get settings for a specific group. |
|
""" |
|
logger.info(f"Getting settings for group ID: {group_id}") |
|
settings = temp.SETTINGS.get(group_id) |
|
if not settings: |
|
settings = await db.get_settings(group_id) |
|
temp.SETTINGS[group_id] = settings |
|
logger.info(f"Settings retrieved from database for group ID: {group_id}") |
|
else: |
|
logger.info(f"Settings retrieved from cache for group ID: {group_id}") |
|
return settings |
|
|
|
async def save_group_settings(group_id, key, value): |
|
""" |
|
Save settings for a specific group. |
|
""" |
|
logger.info(f"Saving setting '{key}' with value '{value}' for group ID: {group_id}") |
|
current = await get_settings(group_id) |
|
current[key] = value |
|
temp.SETTINGS[group_id] = current |
|
await db.update_settings(group_id, current) |
|
logger.info(f"Setting '{key}' saved for group ID: {group_id}") |
|
|
|
def get_size(size): |
|
""" |
|
Convert file size in bytes to a human-readable format. |
|
""" |
|
logger.debug(f"Converting size {size} bytes to human-readable format.") |
|
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] |
|
size = float(size) |
|
i = 0 |
|
while size >= 1024.0 and i < len(units): |
|
i += 1 |
|
size /= 1024.0 |
|
result = "%.2f %s" % (size, units[i]) |
|
logger.debug(f"Converted size to: {result}") |
|
return result |
|
|
|
def get_file_id(msg: Message): |
|
""" |
|
Extract file ID from a message. |
|
""" |
|
logger.debug(f"Extracting file ID from message: {msg.id}") |
|
if not msg.media: |
|
logger.debug("Message does not contain media.") |
|
return None |
|
for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"): |
|
obj = getattr(msg, message_type) |
|
if obj: |
|
setattr(obj, "message_type", message_type) |
|
logger.debug(f"File ID extracted: {obj.file_id}, Type: {message_type}") |
|
return obj |
|
logger.debug("No file ID found in message.") |
|
return None |
|
|
|
def extract_user(message: Message) -> Union[int, str]: |
|
""" |
|
Extract user ID and first name from a message. |
|
""" |
|
logger.debug(f"Extracting user from message: {message.id}") |
|
user_id = None |
|
user_first_name = None |
|
if message.reply_to_message: |
|
user_id = message.reply_to_message.from_user.id |
|
user_first_name = message.reply_to_message.from_user.first_name |
|
logger.debug(f"User extracted from reply: ID: {user_id}, First Name: {user_first_name}") |
|
elif len(message.command) > 1: |
|
if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION): |
|
required_entity = message.entities[1] |
|
user_id = required_entity.user.id |
|
user_first_name = required_entity.user.first_name |
|
logger.debug(f"User extracted from text mention: ID: {user_id}, First Name: {user_first_name}") |
|
else: |
|
user_id = message.command[1] |
|
user_first_name = user_id |
|
try: |
|
user_id = int(user_id) |
|
logger.debug(f"User ID converted to integer: {user_id}") |
|
except ValueError: |
|
logger.debug("User ID conversion failed, keeping as string.") |
|
pass |
|
logger.debug(f"User extracted from command: ID: {user_id}, First Name: {user_first_name}") |
|
else: |
|
user_id = message.from_user.id |
|
user_first_name = message.from_user.first_name |
|
logger.debug(f"User extracted from sender: ID: {user_id}, First Name: {user_first_name}") |
|
return (user_id, user_first_name) |
|
|
|
def split_quotes(text: str) -> List: |
|
""" |
|
Split a quoted text into key and rest. |
|
""" |
|
logger.debug(f"Splitting quotes from text: {text}") |
|
if not any(text.startswith(char) for char in START_CHAR): |
|
key, rest = text.split(None, 1) |
|
logger.debug(f"Text does not start with quote, split into key: {key}, rest: {rest}") |
|
return [key, rest] |
|
counter = 1 |
|
while counter < len(text): |
|
if text[counter] == "\\": |
|
counter += 1 |
|
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): |
|
break |
|
counter += 1 |
|
else: |
|
key, rest = text.split(None, 1) |
|
logger.debug(f"Text does not end with quote, split into key: {key}, rest: {rest}") |
|
return [key, rest] |
|
|
|
|
|
key = remove_escapes(text[1:counter].strip()) |
|
|
|
rest = text[counter + 1:].strip() |
|
if not key: |
|
key = text[0] + text[0] |
|
logger.debug(f"Text split into key: {key}, rest: {rest}") |
|
return list(filter(None, [key, rest])) |
|
|
|
def parser(text, keyword, cb_data): |
|
""" |
|
Parse button URLs and alerts from text. |
|
""" |
|
logger.debug(f"Parsing text for buttons and alerts: {text}") |
|
if "buttonalert" in text: |
|
text = (text.replace("\n", "\\n").replace("\t", "\\t")) |
|
buttons = [] |
|
note_data = "" |
|
prev = 0 |
|
i = 0 |
|
alerts = [] |
|
for match in BTN_URL_REGEX.finditer(text): |
|
n_escapes = 0 |
|
to_check = match.start(1) - 1 |
|
while to_check > 0 and text[to_check] == "\\": |
|
n_escapes += 1 |
|
to_check -= 1 |
|
|
|
if n_escapes % 2 == 0: |
|
note_data += text[prev:match.start(1)] |
|
prev = match.end(1) |
|
if match.group(3) == "buttonalert": |
|
|
|
if bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")) |
|
else: |
|
buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")]) |
|
i += 1 |
|
alerts.append(match.group(4)) |
|
logger.debug(f"Button alert added: Label: {match.group(2)}, Callback Data: {cb_data}:{i}:{keyword}") |
|
elif bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))) |
|
logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}") |
|
else: |
|
buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))]) |
|
logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}") |
|
else: |
|
note_data += text[prev:to_check] |
|
prev = match.start(1) - 1 |
|
else: |
|
note_data += text[prev:] |
|
try: |
|
logger.debug(f"Parsed note data: {note_data}, buttons: {buttons}, alerts: {alerts}") |
|
return note_data, buttons, alerts |
|
except Exception as e: |
|
logger.error(f"Error parsing text: {e}") |
|
return note_data, buttons, None |
|
|
|
def remove_escapes(text: str) -> str: |
|
""" |
|
Remove escape characters from text. |
|
""" |
|
logger.debug(f"Removing escapes from text: {text}") |
|
res = "" |
|
is_escaped = False |
|
for counter in range(len(text)): |
|
if is_escaped: |
|
res += text[counter] |
|
is_escaped = False |
|
elif text[counter] == "\\": |
|
is_escaped = True |
|
else: |
|
res += text[counter] |
|
logger.debug(f"Escapes removed, resulting text: {res}") |
|
return res |
|
|
|
def humanbytes(size): |
|
""" |
|
Convert size in bytes to a human-readable format. |
|
""" |
|
logger.debug(f"Converting size {size} bytes to human-readable format.") |
|
if not size: |
|
logger.debug("Size is zero, returning empty string.") |
|
return "" |
|
power = 2**10 |
|
n = 0 |
|
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} |
|
while size > power: |
|
size /= power |
|
n += 1 |
|
result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B' |
|
logger.debug(f"Converted size to: {result}") |
|
return result |
|
|
|
def get_time(seconds): |
|
""" |
|
Convert seconds to a human-readable time format. |
|
""" |
|
logger.debug(f"Converting {seconds} seconds to human-readable time format.") |
|
periods = [('α΄
', 86400), ('Κ', 3600), ('α΄', 60), ('κ±', 1)] |
|
result = '' |
|
for period_name, period_seconds in periods: |
|
if seconds >= period_seconds: |
|
period_value, seconds = divmod(seconds, period_seconds) |
|
result += f'{int(period_value)}{period_name}' |
|
logger.debug(f"Converted time to: {result}") |
|
return result |
|
|
|
async def get_shortlink(link): |
|
""" |
|
Generate a short link using a URL shortener service. |
|
""" |
|
logger.info(f"Generating short link for: {link}") |
|
url = f'{SHORT_URL}/api' |
|
params = {'api': SHORT_API, 'url': link} |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: |
|
data = await response.json() |
|
if data["status"] == "success": |
|
logger.info(f"Short link generated: {data['shortenedUrl']}") |
|
return data['shortenedUrl'] |
|
else: |
|
logger.error(f"Error generating short link: {data['message']}") |
|
return link |
|
except Exception as e: |
|
logger.error(f"Exception occurred while generating short link: {e}") |
|
return link |
|
|
|
|
|
def extract_time(time_val): |
|
""" |
|
Extract time from a time value string. |
|
""" |
|
logger.debug(f"Extracting time from value: {time_val}") |
|
if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")): |
|
unit = time_val[-1] |
|
time_num = time_val[:-1] |
|
if not time_num.isdigit(): |
|
logger.debug("Time value is not a digit, returning None.") |
|
return None |
|
|
|
if unit == "s": |
|
bantime = datetime.now() + timedelta(seconds=int(time_num)) |
|
elif unit == "m": |
|
bantime = datetime.now() + timedelta(minutes=int(time_num)) |
|
elif unit == "h": |
|
bantime = datetime.now() + timedelta(hours=int(time_num)) |
|
elif unit == "d": |
|
bantime = datetime.now() + timedelta(days=int(time_num)) |
|
else: |
|
logger.debug("Unknown time unit, returning None.") |
|
return None |
|
logger.debug(f"Extracted time: {bantime}") |
|
return bantime |
|
else: |
|
logger.debug("Time value does not end with valid unit, returning None.") |
|
return None |
|
|
|
async def admin_check(message: Message) -> bool: |
|
""" |
|
Check if a user is an admin in the chat. |
|
""" |
|
logger.debug(f"Checking if user {message.from_user.id} is an admin in chat {message.chat.id}.") |
|
if not message.from_user: |
|
logger.debug("Message does not contain a user, returning False.") |
|
return False |
|
if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: |
|
logger.debug("Chat type is not GROUP or SUPERGROUP, returning False.") |
|
return False |
|
if message.from_user.id in [777000, 1087968824]: |
|
logger.debug("User is a Telegram service account, returning True.") |
|
return True |
|
client = message._client |
|
chat_id = message.chat.id |
|
user_id = message.from_user.id |
|
try: |
|
check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id) |
|
logger.debug(f"User {user_id} status in chat {chat_id}: {check_status.status}") |
|
except Exception as e: |
|
logger.error(f"Error checking user status: {e}") |
|
return False |
|
admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR] |
|
if check_status.status not in admin_strings: |
|
logger.debug(f"User {user_id} is not an admin, returning False.") |
|
return False |
|
else: |
|
logger.debug(f"User {user_id} is an admin, returning True.") |
|
return True |
|
|
|
async def admin_filter(filt, client, message): |
|
""" |
|
Filter for admin checks. |
|
""" |
|
logger.debug(f"Applying admin filter for message {message.id}.") |
|
return await admin_check(message) |