Professor / utils.py
azils3's picture
Update utils.py
517e772 verified
raw
history blame
19.9 kB
import logging, os, re, asyncio, requests, aiohttp
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
from pyrogram.types import Message, InlineKeyboardButton
from pyrogram import filters, enums
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API
from imdb import Cinemagoer
from typing import Union, List
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))")
BANNED = {}
SMART_OPEN = 'β€œ'
SMART_CLOSE = '”'
START_CHAR = ('\'', '"', SMART_OPEN)
# temp db for banned
class temp(object):
BANNED_USERS = []
BANNED_CHATS = []
CURRENT = 0
CANCEL = False
MELCOW = {}
U_NAME = None
B_NAME = None
SETTINGS = {}
GP_BUTTONS = {}
PM_BUTTONS = {}
PM_SPELL = {}
GP_SPELL = {}
async def is_subscribed(bot, query):
"""
Check if a user is subscribed to a specific channel.
"""
logger.info(f"Checking if user {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.")
try:
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
logger.info(f"User {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.")
except UserNotParticipant:
logger.info(f"User {query.from_user.id} is not subscribed to channel {AUTH_CHANNEL}.")
pass
except Exception as e:
logger.error(f"Error checking subscription for user {query.from_user.id}: {e}")
print(e)
else:
if user.status != enums.ChatMemberStatus.BANNED:
logger.info(f"User {query.from_user.id} is not banned in channel {AUTH_CHANNEL}.")
return True
logger.info(f"User {query.from_user.id} is not subscribed or is banned in channel {AUTH_CHANNEL}.")
return False
async def get_poster(query, bulk=False, id=False, file=None):
"""
Fetch movie details from IMDb.
"""
logger.info(f"Fetching poster for query: {query}, bulk: {bulk}, id: {id}, file: {file}.")
imdb = Cinemagoer()
if not id:
query = (query.strip()).lower()
title = query
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
title = (query.replace(year, "")).strip()
elif file is not None:
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
if year:
year = list_to_str(year[:1])
else:
year = None
try:
movieid = imdb.search_movie(title.lower(), results=10)
logger.info(f"Found {len(movieid)} movie results for title: {title}.")
except Exception as e:
logger.error(f"Error searching movie: {e}")
return None
if not movieid:
logger.info(f"No movie results found for title: {title}.")
return None
if year:
filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid))
if not filtered:
filtered = movieid
else:
filtered = movieid
movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
if not movieid:
movieid = filtered
if bulk:
logger.info(f"Returning bulk movie results: {filtered}.")
return filtered
movieid = movieid[0].movieID
logger.info(f"Selected movie ID: {movieid}.")
else:
movieid = query
logger.info(f"Using provided movie ID: {movieid}.")
movie = imdb.get_movie(movieid)
if movie.get("original air date"):
date = movie["original air date"]
elif movie.get("year"):
date = movie.get("year")
else:
date = "N/A"
plot = ""
if not LONG_IMDB_DESCRIPTION:
plot = movie.get('plot')
if plot and len(plot) > 0:
plot = plot[0]
else:
plot = movie.get('plot outline')
if plot and len(plot) > 800:
plot = plot[0:800] + "..."
logger.info(f"Movie details fetched: {movie.get('title')}, Year: {date}, Plot: {plot}.")
return {
'title': movie.get('title'),
'votes': movie.get('votes'),
"aka": list_to_str(movie.get("akas")),
"seasons": movie.get("number of seasons"),
"box_office": movie.get('box office'),
'localized_title': movie.get('localized title'),
'kind': movie.get("kind"),
"imdb_id": f"tt{movie.get('imdbID')}",
"cast": list_to_str(movie.get("cast")),
"runtime": list_to_str(movie.get("runtimes")),
"countries": list_to_str(movie.get("countries")),
"certificates": list_to_str(movie.get("certificates")),
"languages": list_to_str(movie.get("languages")),
"director": list_to_str(movie.get("director")),
"writer": list_to_str(movie.get("writer")),
"producer": list_to_str(movie.get("producer")),
"composer": list_to_str(movie.get("composer")) ,
"cinematographer": list_to_str(movie.get("cinematographer")),
"music_team": list_to_str(movie.get("music department")),
"distributors": list_to_str(movie.get("distributors")),
'release_date': date,
'year': movie.get('year'),
'genres': list_to_str(movie.get("genres")),
'poster': movie.get('full-size cover url'),
'plot': plot,
'rating': str(movie.get("rating")),
'url': f'https://www.imdb.com/title/tt{movieid}'
}
def list_to_str(k):
"""
Convert a list to a comma-separated string.
"""
logger.debug(f"Converting list to string: {k}")
if not k:
logger.debug("List is empty, returning 'N/A'.")
return "N/A"
elif len(k) == 1:
logger.debug(f"List has one element, returning: {k[0]}")
return str(k[0])
elif MAX_LIST_ELM:
k = k[:int(MAX_LIST_ELM)]
result = ' '.join(f'{elem}, ' for elem in k)
logger.debug(f"List truncated to {MAX_LIST_ELM} elements, returning: {result}")
return result
else:
result = ' '.join(f'{elem}, ' for elem in k)
logger.debug(f"Returning full list as string: {result}")
return result
__repo__ = "https://github.com/MrMKN/PROFESSOR-BOT"
logger.info(f"Repository URL set to: {__repo__}")
__version__ = "PROFESSOR-BOT α΄ 4.5.0"
logger.info(f"Version set to: {__version__}")
__license__ = "GNU GENERAL PUBLIC LICENSE V2"
logger.info(f"License set to: {__license__}")
__copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>"
logger.info(f"Copyright set to: {__copyright__}")
async def search_gagala(text):
"""
Search Google for a given text.
"""
logger.info(f"Searching Google for text: {text}")
usr_agent = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/61.0.3163.100 Safari/537.36'
}
text = text.replace(" ", '+')
url = f'https://www.google.com/search?q={text}'
try:
response = requests.get(url, headers=usr_agent)
response.raise_for_status()
logger.info(f"Google search successful for text: {text}")
except Exception as e:
logger.error(f"Error searching Google: {e}")
return []
soup = BeautifulSoup(response.text, 'html.parser')
titles = soup.find_all('h3')
result = [title.getText() for title in titles]
logger.info(f"Found {len(result)} Google search results.")
return result
async def get_settings(group_id):
"""
Get settings for a specific group.
"""
logger.info(f"Getting settings for group ID: {group_id}")
settings = temp.SETTINGS.get(group_id)
if not settings:
settings = await db.get_settings(group_id)
temp.SETTINGS[group_id] = settings
logger.info(f"Settings retrieved from database for group ID: {group_id}")
else:
logger.info(f"Settings retrieved from cache for group ID: {group_id}")
return settings
async def save_group_settings(group_id, key, value):
"""
Save settings for a specific group.
"""
logger.info(f"Saving setting '{key}' with value '{value}' for group ID: {group_id}")
current = await get_settings(group_id)
current[key] = value
temp.SETTINGS[group_id] = current
await db.update_settings(group_id, current)
logger.info(f"Setting '{key}' saved for group ID: {group_id}")
def get_size(size):
"""
Convert file size in bytes to a human-readable format.
"""
logger.debug(f"Converting size {size} bytes to human-readable format.")
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
size = float(size)
i = 0
while size >= 1024.0 and i < len(units):
i += 1
size /= 1024.0
result = "%.2f %s" % (size, units[i])
logger.debug(f"Converted size to: {result}")
return result
def get_file_id(msg: Message):
"""
Extract file ID from a message.
"""
logger.debug(f"Extracting file ID from message: {msg.id}")
if not msg.media:
logger.debug("Message does not contain media.")
return None
for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"):
obj = getattr(msg, message_type)
if obj:
setattr(obj, "message_type", message_type)
logger.debug(f"File ID extracted: {obj.file_id}, Type: {message_type}")
return obj
logger.debug("No file ID found in message.")
return None
def extract_user(message: Message) -> Union[int, str]:
"""
Extract user ID and first name from a message.
"""
logger.debug(f"Extracting user from message: {message.id}")
user_id = None
user_first_name = None
if message.reply_to_message:
user_id = message.reply_to_message.from_user.id
user_first_name = message.reply_to_message.from_user.first_name
logger.debug(f"User extracted from reply: ID: {user_id}, First Name: {user_first_name}")
elif len(message.command) > 1:
if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION):
required_entity = message.entities[1]
user_id = required_entity.user.id
user_first_name = required_entity.user.first_name
logger.debug(f"User extracted from text mention: ID: {user_id}, First Name: {user_first_name}")
else:
user_id = message.command[1]
user_first_name = user_id
try:
user_id = int(user_id)
logger.debug(f"User ID converted to integer: {user_id}")
except ValueError:
logger.debug("User ID conversion failed, keeping as string.")
pass
logger.debug(f"User extracted from command: ID: {user_id}, First Name: {user_first_name}")
else:
user_id = message.from_user.id
user_first_name = message.from_user.first_name
logger.debug(f"User extracted from sender: ID: {user_id}, First Name: {user_first_name}")
return (user_id, user_first_name)
def split_quotes(text: str) -> List:
"""
Split a quoted text into key and rest.
"""
logger.debug(f"Splitting quotes from text: {text}")
if not any(text.startswith(char) for char in START_CHAR):
key, rest = text.split(None, 1)
logger.debug(f"Text does not start with quote, split into key: {key}, rest: {rest}")
return [key, rest]
counter = 1 # ignore first char -> is some kind of quote
while counter < len(text):
if text[counter] == "\\":
counter += 1
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
break
counter += 1
else:
key, rest = text.split(None, 1)
logger.debug(f"Text does not end with quote, split into key: {key}, rest: {rest}")
return [key, rest]
# 1 to avoid starting quote, and counter is exclusive so avoids ending
key = remove_escapes(text[1:counter].strip())
# index will be in range, or `else` would have been executed and returned
rest = text[counter + 1:].strip()
if not key:
key = text[0] + text[0]
logger.debug(f"Text split into key: {key}, rest: {rest}")
return list(filter(None, [key, rest]))
def parser(text, keyword, cb_data):
"""
Parse button URLs and alerts from text.
"""
logger.debug(f"Parsing text for buttons and alerts: {text}")
if "buttonalert" in text:
text = (text.replace("\n", "\\n").replace("\t", "\\t"))
buttons = []
note_data = ""
prev = 0
i = 0
alerts = []
for match in BTN_URL_REGEX.finditer(text):
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and text[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
note_data += text[prev:match.start(1)]
prev = match.end(1)
if match.group(3) == "buttonalert":
# create a triple with button label, url, and newline status
if bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}"))
else:
buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")])
i += 1
alerts.append(match.group(4))
logger.debug(f"Button alert added: Label: {match.group(2)}, Callback Data: {cb_data}:{i}:{keyword}")
elif bool(match.group(5)) and buttons:
buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", "")))
logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}")
else:
buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))])
logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}")
else:
note_data += text[prev:to_check]
prev = match.start(1) - 1
else:
note_data += text[prev:]
try:
logger.debug(f"Parsed note data: {note_data}, buttons: {buttons}, alerts: {alerts}")
return note_data, buttons, alerts
except Exception as e:
logger.error(f"Error parsing text: {e}")
return note_data, buttons, None
def remove_escapes(text: str) -> str:
"""
Remove escape characters from text.
"""
logger.debug(f"Removing escapes from text: {text}")
res = ""
is_escaped = False
for counter in range(len(text)):
if is_escaped:
res += text[counter]
is_escaped = False
elif text[counter] == "\\":
is_escaped = True
else:
res += text[counter]
logger.debug(f"Escapes removed, resulting text: {res}")
return res
def humanbytes(size):
"""
Convert size in bytes to a human-readable format.
"""
logger.debug(f"Converting size {size} bytes to human-readable format.")
if not size:
logger.debug("Size is zero, returning empty string.")
return ""
power = 2**10
n = 0
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
while size > power:
size /= power
n += 1
result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
logger.debug(f"Converted size to: {result}")
return result
def get_time(seconds):
"""
Convert seconds to a human-readable time format.
"""
logger.debug(f"Converting {seconds} seconds to human-readable time format.")
periods = [('α΄…', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)]
result = ''
for period_name, period_seconds in periods:
if seconds >= period_seconds:
period_value, seconds = divmod(seconds, period_seconds)
result += f'{int(period_value)}{period_name}'
logger.debug(f"Converted time to: {result}")
return result
async def get_shortlink(link):
"""
Generate a short link using a URL shortener service.
"""
logger.info(f"Generating short link for: {link}")
url = f'{SHORT_URL}/api'
params = {'api': SHORT_API, 'url': link}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
data = await response.json()
if data["status"] == "success":
logger.info(f"Short link generated: {data['shortenedUrl']}")
return data['shortenedUrl']
else:
logger.error(f"Error generating short link: {data['message']}")
return link
except Exception as e:
logger.error(f"Exception occurred while generating short link: {e}")
return link
# from Midukki-RoBoT
def extract_time(time_val):
"""
Extract time from a time value string.
"""
logger.debug(f"Extracting time from value: {time_val}")
if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")):
unit = time_val[-1]
time_num = time_val[:-1] # type: str
if not time_num.isdigit():
logger.debug("Time value is not a digit, returning None.")
return None
if unit == "s":
bantime = datetime.now() + timedelta(seconds=int(time_num))
elif unit == "m":
bantime = datetime.now() + timedelta(minutes=int(time_num))
elif unit == "h":
bantime = datetime.now() + timedelta(hours=int(time_num))
elif unit == "d":
bantime = datetime.now() + timedelta(days=int(time_num))
else:
logger.debug("Unknown time unit, returning None.")
return None
logger.debug(f"Extracted time: {bantime}")
return bantime
else:
logger.debug("Time value does not end with valid unit, returning None.")
return None
async def admin_check(message: Message) -> bool:
"""
Check if a user is an admin in the chat.
"""
logger.debug(f"Checking if user {message.from_user.id} is an admin in chat {message.chat.id}.")
if not message.from_user:
logger.debug("Message does not contain a user, returning False.")
return False
if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
logger.debug("Chat type is not GROUP or SUPERGROUP, returning False.")
return False
if message.from_user.id in [777000, 1087968824]:
logger.debug("User is a Telegram service account, returning True.")
return True
client = message._client
chat_id = message.chat.id
user_id = message.from_user.id
try:
check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id)
logger.debug(f"User {user_id} status in chat {chat_id}: {check_status.status}")
except Exception as e:
logger.error(f"Error checking user status: {e}")
return False
admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR]
if check_status.status not in admin_strings:
logger.debug(f"User {user_id} is not an admin, returning False.")
return False
else:
logger.debug(f"User {user_id} is an admin, returning True.")
return True
async def admin_filter(filt, client, message):
"""
Filter for admin checks.
"""
logger.debug(f"Applying admin filter for message {message.id}.")
return await admin_check(message)