|
from fastapi import FastAPI, HTTPException, Depends, Request |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.templating import Jinja2Templates |
|
from pydantic import BaseModel, Field |
|
from bot import Bot, temp |
|
from database.users_chats_db import db |
|
from database.ia_filterdb import Media, get_file_details, get_search_results |
|
from database.connections_mdb import add_connection, active_connection, all_connections, if_active, make_active, make_inactive, delete_connection |
|
from database.filters_mdb import add_filter, find_filter, get_filters, delete_filter, del_all, count_filters, filter_stats |
|
from database.gfilters_mdb import add_gfilter, find_gfilter, get_gfilters, delete_gfilter, del_allg, count_gfilters, gfilter_stats |
|
from utils import get_settings, save_group_settings, humanbytes, get_size, extract_user, get_file_id, get_poster, parser, get_shortlink, extract_time, admin_check |
|
from info import API_ID, API_HASH, BOT_TOKEN, LOG_CHANNEL, UPTIME, WEB_SUPPORT, LOG_MSG, ADMINS, CHANNELS |
|
from pyrogram import Client, filters, enums, types |
|
from pyrogram.errors import ChatAdminRequired, UserIsBlocked, MessageNotModified, PeerIdInvalid, FloodWait |
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message, CallbackQuery |
|
from telegraph import upload_file |
|
from io import BytesIO |
|
import aiohttp |
|
import asyncio |
|
import requests |
|
import os |
|
import logging |
|
import re |
|
import traceback |
|
import json |
|
from googletrans import Translator |
|
from gtts import gTTS |
|
from youtube_search import YoutubeSearch |
|
from youtubesearchpython import SearchVideos |
|
from yt_dlp import YoutubeDL |
|
import shutil |
|
import psutil |
|
import wget |
|
from datetime import datetime, timedelta |
|
from urllib.parse import quote |
|
from telegraph import upload_file |
|
from PIL import Image, ImageDraw, ImageFont |
|
import textwrap |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
bot_instance = Bot() |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
asyncio.create_task(bot_instance.start()) |
|
|
|
@app.on_event("shutdown") |
|
async def shutdown_event(): |
|
await bot_instance.stop() |
|
|
|
|
|
@app.get("/", include_in_schema=False) |
|
async def dashboard(request: Request): |
|
try: |
|
|
|
total_users = await db.total_users_count() |
|
total_chats = await db.total_chat_count() |
|
uptime = datetime.now() - datetime.fromtimestamp(temp.UPTIME) |
|
|
|
|
|
cpu = psutil.cpu_percent() |
|
mem = psutil.virtual_memory().percent |
|
disk = psutil.disk_usage('/').percent |
|
|
|
return templates.TemplateResponse("dashboard.html", { |
|
"request": request, |
|
"total_users": total_users, |
|
"total_chats": total_chats, |
|
"uptime": str(uptime).split('.')[0], |
|
"cpu_usage": f"{cpu}%", |
|
"mem_usage": f"{mem}%", |
|
"disk_usage": f"{disk}%", |
|
"bot_username": (await bot_instance.get_me()).username, |
|
"version": "PROFESSOR-BOT v4.5.0" |
|
}) |
|
except Exception as e: |
|
logging.error(f"Dashboard Error: {str(e)}") |
|
return templates.TemplateResponse("error.html", {"request": request}) |
|
|
|
|
|
|
|
async def get_chat_member(client, chat_id, user_id): |
|
try: |
|
return await client.get_chat_member(chat_id, user_id) |
|
except Exception as e: |
|
logger.error(f"Error getting chat member: {e}") |
|
return None |
|
|
|
async def send_message(client, chat_id, text, reply_markup=None, disable_web_page_preview=False): |
|
try: |
|
return await client.send_message(chat_id, text, reply_markup=reply_markup, disable_web_page_preview=disable_web_page_preview) |
|
except Exception as e: |
|
logger.error(f"Error sending message: {e}") |
|
return None |
|
|
|
async def edit_message(client, message, text, reply_markup=None): |
|
try: |
|
return await message.edit_text(text, reply_markup=reply_markup) |
|
except Exception as e: |
|
logger.error(f"Error editing message: {e}") |
|
return None |
|
|
|
async def delete_message(client, message): |
|
try: |
|
return await message.delete() |
|
except Exception as e: |
|
logger.error(f"Error deleting message: {e}") |
|
return None |
|
|
|
|
|
class User(BaseModel): |
|
user_id: int |
|
|
|
class Group(BaseModel): |
|
group_id: int |
|
|
|
class Filter(BaseModel): |
|
group_id: int |
|
text: str |
|
reply_text: str |
|
btn: str = Field(default=None) |
|
file: str = Field(default=None) |
|
alert: str = Field(default=None) |
|
|
|
class GlobalFilter(BaseModel): |
|
text: str |
|
reply_text: str |
|
btn: str = Field(default=None) |
|
file: str = Field(default=None) |
|
alert: str = Field(default=None) |
|
|
|
class BroadcastMessage(BaseModel): |
|
chat_id: int |
|
message_id: int |
|
|
|
class ShortenURL(BaseModel): |
|
url: str |
|
|
|
class SettingsUpdate(BaseModel): |
|
group_id: int |
|
setting_key: str |
|
setting_value: bool |
|
|
|
class SearchQuery(BaseModel): |
|
query: str |
|
file_type: str = Field(default=None) |
|
offset: int = Field(default=0) |
|
max_results: int = Field(default=10) |
|
|
|
class ConnectionRequest(BaseModel): |
|
user_id: int |
|
group_id: int |
|
|
|
class TextToSpeechRequest(BaseModel): |
|
text: str |
|
|
|
class YouTubeDownloadRequest(BaseModel): |
|
url: str |
|
|
|
class TelegraphUploadRequest(BaseModel): |
|
file_path: str |
|
|
|
class CarbonRequest(BaseModel): |
|
text: str |
|
|
|
class FontRequest(BaseModel): |
|
text: str |
|
style: str |
|
|
|
class PasteRequest(BaseModel): |
|
text: str |
|
|
|
class ShareTextRequest(BaseModel): |
|
text: str |
|
|
|
class GroupManagerAction(BaseModel): |
|
action: str |
|
user_id: int = Field(default=None) |
|
time: str = Field(default=None) |
|
message_id: int = Field(default=None) |
|
|
|
|
|
@app.get("/uptime") |
|
async def get_uptime(): |
|
uptime = datetime.now() - datetime.fromtimestamp(temp.UPTIME) |
|
return {"uptime": str(uptime)} |
|
|
|
@app.get("/stats") |
|
async def get_stats(): |
|
total_users = await db.total_users_count() |
|
total_chats = await db.total_chat_count() |
|
return {"total_users": total_users, "total_chats": total_chats} |
|
|
|
@app.post("/ban_user") |
|
async def ban_user_endpoint(user: User): |
|
await db.ban_user(user.user_id, "No Reason") |
|
temp.BANNED_USERS.append(user.user_id) |
|
return {"status": "User banned successfully"} |
|
|
|
@app.post("/unban_user") |
|
async def unban_user_endpoint(user: User): |
|
await db.remove_ban(user.user_id) |
|
temp.BANNED_USERS.remove(user.user_id) |
|
return {"status": "User unbanned successfully"} |
|
|
|
@app.post("/add_filter") |
|
async def add_filter_endpoint(filter_data: Filter): |
|
await add_filter(filter_data.group_id, filter_data.text, filter_data.reply_text, filter_data.btn, filter_data.file, filter_data.alert) |
|
return {"status": "Filter added successfully"} |
|
|
|
@app.post("/find_filter") |
|
async def find_filter_endpoint(filter_data: Filter): |
|
reply_text, btn, alert, fileid = await find_filter(filter_data.group_id, filter_data.text) |
|
return {"reply_text": reply_text, "btn": btn, "alert": alert, "fileid": fileid} |
|
|
|
@app.post("/get_filters") |
|
async def get_filters_endpoint(group_id: int): |
|
texts = await get_filters(group_id) |
|
return {"filters": texts} |
|
|
|
@app.post("/delete_filter") |
|
async def delete_filter_endpoint(filter_data: Filter): |
|
await delete_filter(None, filter_data.text, filter_data.group_id) |
|
return {"status": "Filter deleted successfully"} |
|
|
|
@app.post("/del_all_filters") |
|
async def del_all_filters_endpoint(group_id: int): |
|
await del_all(None, group_id, "Group") |
|
return {"status": "All filters deleted successfully"} |
|
|
|
@app.post("/add_gfilter") |
|
async def add_gfilter_endpoint(gfilter_data: GlobalFilter): |
|
await add_gfilter('gfilters', gfilter_data.text, gfilter_data.reply_text, gfilter_data.btn, gfilter_data.file, gfilter_data.alert) |
|
return {"status": "Global filter added successfully"} |
|
|
|
@app.post("/find_gfilter") |
|
async def find_gfilter_endpoint(gfilter_data: GlobalFilter): |
|
reply_text, btn, alert, fileid = await find_gfilter('gfilters', gfilter_data.text) |
|
return {"reply_text": reply_text, "btn": btn, "alert": alert, "fileid": fileid} |
|
|
|
@app.post("/get_gfilters") |
|
async def get_gfilters_endpoint(): |
|
texts = await get_gfilters('gfilters') |
|
return {"global_filters": texts} |
|
|
|
@app.post("/delete_gfilter") |
|
async def delete_gfilter_endpoint(gfilter_data: GlobalFilter): |
|
await delete_gfilter(None, gfilter_data.text, 'gfilters') |
|
return {"status": "Global filter deleted successfully"} |
|
|
|
@app.post("/del_all_gfilters") |
|
async def del_all_gfilters_endpoint(): |
|
await del_allg(None, 'gfilters') |
|
return {"status": "All global filters deleted successfully"} |
|
|
|
@app.post("/broadcast") |
|
async def broadcast_endpoint(broadcast_data: BroadcastMessage): |
|
users = await db.get_all_users() |
|
b_msg = await bot_instance.get_messages(broadcast_data.chat_id, broadcast_data.message_id) |
|
sts = await send_message(bot_instance, broadcast_data.chat_id, "Broadcasting your messages...") |
|
start_time = time.time() |
|
total_users = await db.total_users_count() |
|
done = 0 |
|
blocked = 0 |
|
deleted = 0 |
|
failed = 0 |
|
success = 0 |
|
async for user in users: |
|
pti, sh = await broadcast_messages(int(user['id']), b_msg) |
|
if pti: |
|
success += 1 |
|
elif pti == False: |
|
if sh == "Blocked": |
|
blocked += 1 |
|
elif sh == "Deleted": |
|
deleted += 1 |
|
elif sh == "Error": |
|
failed += 1 |
|
done += 1 |
|
if not done % 20: |
|
await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") |
|
time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) |
|
await delete_message(bot_instance, sts) |
|
await send_message(bot_instance, broadcast_data.chat_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") |
|
return {"status": "Broadcast completed"} |
|
|
|
@app.post("/clear_junk") |
|
async def clear_junk_endpoint(user_id: int): |
|
sts = await send_message(bot_instance, user_id, "Clearing junk users...") |
|
start_time = time.time() |
|
total_users = await db.total_users_count() |
|
blocked = 0 |
|
deleted = 0 |
|
failed = 0 |
|
done = 0 |
|
async for user in db.get_all_users(): |
|
pti, sh = await clear_junk(int(user['id']), None) |
|
if pti == False: |
|
if sh == "Blocked": |
|
blocked += 1 |
|
elif sh == "Deleted": |
|
deleted += 1 |
|
elif sh == "Error": |
|
failed += 1 |
|
done += 1 |
|
if not done % 20: |
|
await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") |
|
time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) |
|
await delete_message(bot_instance, sts) |
|
await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") |
|
return {"status": "Junk clearing completed"} |
|
|
|
@app.post("/group_broadcast") |
|
async def group_broadcast_endpoint(broadcast_data: BroadcastMessage): |
|
groups = await db.get_all_chats() |
|
b_msg = await bot_instance.get_messages(broadcast_data.chat_id, broadcast_data.message_id) |
|
sts = await send_message(bot_instance, broadcast_data.chat_id, "Broadcasting to groups...") |
|
start_time = time.time() |
|
total_groups = await db.total_chat_count() |
|
done = 0 |
|
failed = "" |
|
success = 0 |
|
deleted = 0 |
|
async for group in groups: |
|
pti, sh, ex = await broadcast_messages_group(int(group['id']), b_msg) |
|
if pti == True: |
|
if sh == "Success": |
|
success += 1 |
|
elif pti == False: |
|
if sh == "deleted": |
|
deleted += 1 |
|
failed += ex |
|
try: |
|
await bot_instance.leave_chat(int(group['id'])) |
|
except Exception as e: |
|
logger.error(f"{e} > {group['id']}") |
|
done += 1 |
|
if not done % 20: |
|
await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}") |
|
time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) |
|
await delete_message(bot_instance, sts) |
|
await send_message(bot_instance, broadcast_data.chat_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}\n\nFailed reasons: {failed}") |
|
return {"status": "Group broadcast completed"} |
|
|
|
@app.post("/junk_group") |
|
async def junk_group_endpoint(user_id: int): |
|
groups = await db.get_all_chats() |
|
sts = await send_message(bot_instance, user_id, "Clearing junk groups...") |
|
start_time = time.time() |
|
total_groups = await db.total_chat_count() |
|
done = 0 |
|
failed = "" |
|
deleted = 0 |
|
async for group in groups: |
|
pti, sh, ex = await junk_group(int(group['id']), None) |
|
if pti == False: |
|
if sh == "deleted": |
|
deleted += 1 |
|
failed += ex |
|
try: |
|
await bot_instance.leave_chat(int(group['id'])) |
|
except Exception as e: |
|
logger.error(f"{e} > {group['id']}") |
|
done += 1 |
|
if not done % 20: |
|
await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}") |
|
time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) |
|
await delete_message(bot_instance, sts) |
|
await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}\n\nFailed reasons: {failed}") |
|
return {"status": "Junk group clearing completed"} |
|
|
|
@app.post("/add_connection") |
|
async def add_connection_endpoint(connection_request: ConnectionRequest): |
|
result = await add_connection(connection_request.group_id, connection_request.user_id) |
|
return {"status": "Connection added successfully" if result else "Connection already exists"} |
|
|
|
@app.post("/active_connection") |
|
async def active_connection_endpoint(user_id: int): |
|
group_id = await active_connection(user_id) |
|
return {"active_group_id": group_id} |
|
|
|
@app.post("/all_connections") |
|
async def all_connections_endpoint(user_id: int): |
|
group_ids = await all_connections(user_id) |
|
return {"group_ids": group_ids} |
|
|
|
@app.post("/if_active") |
|
async def if_active_endpoint(connection_request: ConnectionRequest): |
|
result = await if_active(connection_request.user_id, connection_request.group_id) |
|
return {"is_active": result} |
|
|
|
@app.post("/make_active") |
|
async def make_active_endpoint(connection_request: ConnectionRequest): |
|
result = await make_active(connection_request.user_id, connection_request.group_id) |
|
return {"status": "Made active successfully" if result else "Failed to make active"} |
|
|
|
@app.post("/make_inactive") |
|
async def make_inactive_endpoint(user_id: int): |
|
result = await make_inactive(user_id) |
|
return {"status": "Made inactive successfully" if result else "Failed to make inactive"} |
|
|
|
@app.post("/delete_connection") |
|
async def delete_connection_endpoint(connection_request: ConnectionRequest): |
|
result = await delete_connection(connection_request.user_id, connection_request.group_id) |
|
return {"status": "Connection deleted successfully" if result else "Failed to delete connection"} |
|
|
|
@app.post("/search") |
|
async def search_endpoint(search_query: SearchQuery): |
|
files, next_offset, total_results = await get_search_results(search_query.query, file_type=search_query.file_type, offset=search_query.offset, max_results=search_query.max_results) |
|
return {"files": [file.dict() for file in files], "next_offset": next_offset, "total_results": total_results} |
|
|
|
@app.post("/get_file_details") |
|
async def get_file_details_endpoint(file_id: str): |
|
file_details = await get_file_details(file_id) |
|
if file_details: |
|
return {"file_details": [file.dict() for file in file_details]} |
|
return {"file_details": None} |
|
|
|
@app.post("/get_settings") |
|
async def get_settings_endpoint(group_id: int): |
|
settings = await get_settings(group_id) |
|
return {"settings": settings} |
|
|
|
@app.post("/save_group_settings") |
|
async def save_group_settings_endpoint(settings_update: SettingsUpdate): |
|
await save_group_settings(settings_update.group_id, settings_update.setting_key, settings_update.setting_value) |
|
return {"status": "Settings updated successfully"} |
|
|
|
@app.post("/send_cached_media") |
|
async def send_cached_media_endpoint(chat_id: int, file_id: str, caption: str = None, protect_content: bool = False): |
|
await bot_instance.send_cached_media(chat_id=chat_id, file_id=file_id, caption=caption, protect_content=protect_content) |
|
return {"status": "Media sent successfully"} |
|
|
|
@app.post("/get_poster") |
|
async def get_poster_endpoint(query: str, id: bool = False, file: str = None): |
|
poster = await get_poster(query, bulk=False, id=id, file=file) |
|
return {"poster": poster} |
|
|
|
@app.post("/parse_text") |
|
async def parse_text_endpoint(text: str, keyword: str, cb_data: str): |
|
note_data, buttons, alerts = parser(text, keyword, cb_data) |
|
return {"note_data": note_data, "buttons": buttons, "alerts": alerts} |
|
|
|
@app.post("/get_shortlink") |
|
async def get_shortlink_endpoint(shorten_url: ShortenURL): |
|
short_url = await get_shortlink(shorten_url.url) |
|
return {"short_url": short_url} |
|
|
|
@app.post("/extract_time") |
|
async def extract_time_endpoint(time_val: str): |
|
extracted_time = extract_time(time_val) |
|
return {"extracted_time": extracted_time} |
|
|
|
@app.post("/admin_check") |
|
async def admin_check_endpoint(chat_id: int, user_id: int): |
|
is_admin = await admin_check(Message(chat_id=chat_id, from_user=types.User(id=user_id))) |
|
return {"is_admin": is_admin} |
|
|
|
@app.post("/auto_filter") |
|
async def auto_filter_endpoint(query: str, chat_id: int, user_id: int, offset: int = 0, max_results: int = 10): |
|
files, next_offset, total_results = await get_search_results(query, offset=offset, max_results=max_results) |
|
if not files: |
|
return {"status": "No files found"} |
|
pre = 'filep' if await get_settings(chat_id)['file_secure'] else 'file' |
|
btn = [] |
|
for file in files: |
|
btn.append([ |
|
InlineKeyboardButton(text=f"[{humanbytes(file.file_size)}] {file.file_name}", callback_data=f'{pre}#{user_id}#{file.file_id}') |
|
]) |
|
btn.append([InlineKeyboardButton(text="π How to download π", callback_data="howdl")]) |
|
reply_markup = InlineKeyboardMarkup(btn) |
|
return {"files": [file.dict() for file in files], "next_offset": next_offset, "total_results": total_results, "reply_markup": reply_markup.dict()} |
|
|
|
@app.post("/send_telegram_message") |
|
async def send_telegram_message_endpoint(chat_id: int, text: str): |
|
message = await send_message(bot_instance, chat_id, text) |
|
return {"message_id": message.id} |
|
|
|
@app.post("/edit_telegram_message") |
|
async def edit_telegram_message_endpoint(chat_id: int, message_id: int, text: str): |
|
message = await bot_instance.get_messages(chat_id, message_id) |
|
edited_message = await edit_message(bot_instance, message, text) |
|
return {"edited_message_id": edited_message.id} |
|
|
|
@app.post("/delete_telegram_message") |
|
async def delete_telegram_message_endpoint(chat_id: int, message_id: int): |
|
message = await bot_instance.get_messages(chat_id, message_id) |
|
await delete_message(bot_instance, message) |
|
return {"status": "Message deleted successfully"} |
|
|
|
@app.post("/carbon") |
|
async def carbon_endpoint(carbon_request: CarbonRequest): |
|
carbon_image = await make_carbon(carbon_request.text, True) |
|
return {"carbon_url": carbon_image} |
|
|
|
async def make_carbon(text, tele=False): |
|
url = "https://carbonara.solopov.dev/api/cook" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(url, json={"code": text}) as resp: |
|
image = BytesIO(await resp.read()) |
|
image.name = "carbon.png" |
|
if tele: |
|
uf = upload_file(image) |
|
image.close() |
|
return f"https://graph.org{uf[0]}" |
|
return image |
|
|
|
@app.post("/font") |
|
async def font_endpoint(font_request: FontRequest): |
|
fonts = { |
|
"typewriter": Fonts.typewriter, |
|
"outline": Fonts.outline, |
|
"serif": Fonts.serief, |
|
"bold_cool": Fonts.bold_cool, |
|
"cool": Fonts.cool, |
|
"small_cap": Fonts.smallcap, |
|
"script": Fonts.script, |
|
"bold_script": Fonts.bold_script, |
|
"tiny": Fonts.tiny, |
|
"comic": Fonts.comic, |
|
"san": Fonts.san, |
|
"slant_san": Fonts.slant_san, |
|
"slant": Fonts.slant, |
|
"sim": Fonts.sim, |
|
"circles": Fonts.circles, |
|
"dark_circle": Fonts.dark_circle, |
|
"gothic": Fonts.gothic, |
|
"bold_gothic": Fonts.bold_gothic, |
|
"cloud": Fonts.cloud, |
|
"happy": Fonts.happy, |
|
"sad": Fonts.sad, |
|
"special": Fonts.special, |
|
"square": Fonts.square, |
|
"dark_square": Fonts.dark_square, |
|
"andalucia": Fonts.andalucia, |
|
"manga": Fonts.manga, |
|
"stinky": Fonts.stinky, |
|
"bubbles": Fonts.bubbles, |
|
"underline": Fonts.underline, |
|
"ladybug": Fonts.ladybug, |
|
"rays": Fonts.rays, |
|
"birds": Fonts.birds, |
|
"slash": Fonts.slash, |
|
"stop": Fonts.stop, |
|
"skyline": Fonts.skyline, |
|
"arrows": Fonts.arrows, |
|
"rvnes": Fonts.rvnes, |
|
"strike": Fonts.strike, |
|
"frozen": Fonts.frozen |
|
} |
|
cls = fonts.get(font_request.style) |
|
if not cls: |
|
return {"error": "Invalid style"} |
|
new_text = cls(font_request.text) |
|
return {"new_text": new_text} |
|
|
|
@app.post("/paste") |
|
async def paste_endpoint(paste_request: PasteRequest): |
|
paste_response = await p_paste(paste_request.text) |
|
return paste_response |
|
|
|
async def p_paste(code, extension=None): |
|
siteurl = "https://pasty.lus.pm/api/v1/pastes" |
|
data = {"content": code} |
|
try: |
|
response = requests.post(url=siteurl, data=json.dumps(data), headers={"User-Agent": "Mozilla/5.0", "content-type": "application/json"}) |
|
except Exception as e: |
|
return {"error": str(e)} |
|
if response.ok: |
|
response = response.json() |
|
purl = ( |
|
f"https://pasty.lus.pm/{response['id']}.{extension}" |
|
if extension |
|
else f"https://pasty.lus.pm/{response['id']}.txt" |
|
) |
|
return { |
|
"url": purl, |
|
"raw": f"https://pasty.lus.pm/{response['id']}/raw", |
|
"bin": "Pasty", |
|
} |
|
return {"error": "Unable to reach pasty.lus.pm"} |
|
|
|
@app.post("/share_text") |
|
async def share_text_endpoint(share_request: ShareTextRequest): |
|
share_url = f"https://t.me/share/url?url={quote(share_request.text)}" |
|
return {"share_url": share_url} |
|
|
|
@app.post("/telegraph_upload") |
|
async def telegraph_upload_endpoint(upload_request: TelegraphUploadRequest): |
|
try: |
|
response = upload_file(upload_request.file_path) |
|
except Exception as e: |
|
logger.error(f"Error uploading to Telegraph: {e}") |
|
return {"error": str(e)} |
|
telegraph_url = f"https://graph.org{response[0]}" |
|
return {"telegraph_url": telegraph_url} |
|
|
|
@app.post("/text_to_speech") |
|
async def text_to_speech_endpoint(tts_request: TextToSpeechRequest): |
|
audio = await convert(tts_request.text) |
|
return {"audio_url": audio.name} |
|
|
|
async def convert(text): |
|
audio = BytesIO() |
|
i = Translator().translate(text, dest="en") |
|
lang = i.src |
|
tts = gTTS(text, lang=lang) |
|
audio.name = lang + ".mp3" |
|
tts.write_to_fp(audio) |
|
return audio |
|
|
|
@app.post("/download_youtube_song") |
|
async def download_youtube_song_endpoint(yt_request: YouTubeDownloadRequest): |
|
try: |
|
results = YoutubeSearch(yt_request.url, max_results=1).to_dict() |
|
link = f"https://youtube.com{results[0]['url_suffix']}" |
|
title = results[0]["title"][:40] |
|
thumbnail = results[0]["thumbnails"][0] |
|
thumb_name = f'thumb{title}.jpg' |
|
thumb = requests.get(thumbnail, allow_redirects=True) |
|
open(thumb_name, 'wb').write(thumb.content) |
|
performer = f"[Mα΄Ι΄ Bα΄α΄α΄’β’]" |
|
duration = results[0]["duration"] |
|
url_suffix = results[0]["url_suffix"] |
|
views = results[0]["views"] |
|
except Exception as e: |
|
logger.error(f"Error downloading YouTube song: {e}") |
|
return {"error": str(e)} |
|
|
|
ydl_opts = {"format": "bestaudio[ext=m4a]"} |
|
try: |
|
with YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(link, download=False) |
|
audio_file = ydl.prepare_filename(info_dict) |
|
ydl.process_info(info_dict) |
|
except Exception as e: |
|
logger.error(f"Error processing YouTube song: {e}") |
|
return {"error": str(e)} |
|
|
|
cap = "**BYβΊβΊ [Mα΄Ι΄ Bα΄α΄α΄’β’](https://t.me/mkn_bots_updates)**" |
|
secmul, dur, dur_arr = 1, 0, duration.split(':') |
|
for i in range(len(dur_arr) - 1, -1, -1): |
|
dur += (int(dur_arr[i]) * secmul) |
|
secmul *= 60 |
|
|
|
return {"audio_file": audio_file, "caption": cap, "title": title, "duration": dur, "performer": performer, "thumb": thumb_name} |
|
|
|
@app.post("/download_youtube_video") |
|
async def download_youtube_video_endpoint(yt_request: YouTubeDownloadRequest): |
|
url = yt_request.url |
|
try: |
|
search = SearchVideos(f"{url}", offset=1, mode="dict", max_results=1) |
|
mi = search.result() |
|
mio = mi["search_result"] |
|
mo = mio[0]["link"] |
|
thum = mio[0]["title"] |
|
fridayz = mio[0]["id"] |
|
mio[0]["channel"] |
|
kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" |
|
await asyncio.sleep(0.6) |
|
url = mo |
|
sedlyf = wget.download(kekme) |
|
opts = { |
|
"format": "best", |
|
"addmetadata": True, |
|
"key": "FFmpegMetadata", |
|
"prefer_ffmpeg": True, |
|
"geo_bypass": True, |
|
"nocheckcertificate": True, |
|
"postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}], |
|
"outtmpl": "%(id)s.mp4", |
|
"logtostderr": False, |
|
"quiet": True, |
|
} |
|
with YoutubeDL(opts) as ytdl: |
|
ytdl_data = ytdl.extract_info(url, download=True) |
|
except Exception as e: |
|
logger.error(f"Error downloading YouTube video: {e}") |
|
return {"error": str(e)} |
|
|
|
file_stark = f"{ytdl_data['id']}.mp4" |
|
capy = f"""**ππΈππ»π΄ :** [{thum}]({mo})\n**ππ΄πππ΄πππ΄π³ π±π :** {await bot_instance.get_me().mention}""" |
|
|
|
return {"video_file": file_stark, "caption": capy, "title": ytdl_data["title"], "duration": int(ytdl_data["duration"]), "thumb": sedlyf} |
|
|
|
@app.post("/group_manager_action") |
|
async def group_manager_action_endpoint(action_request: GroupManagerAction): |
|
chat_id = action_request.group_id |
|
action = action_request.action |
|
user_id = action_request.user_id |
|
time_val = action_request.time |
|
message_id = action_request.message_id |
|
|
|
if action == "ban": |
|
await bot_instance.ban_chat_member(chat_id, user_id) |
|
return {"status": "User banned successfully"} |
|
elif action == "unban": |
|
await bot_instance.unban_chat_member(chat_id, user_id) |
|
return {"status": "User unbanned successfully"} |
|
elif action == "tban": |
|
until_date_val = extract_time(time_val) |
|
if until_date_val is None: |
|
return {"error": "Invalid time type specified. Expected m, h, or d, Got it: {time_val[-1]}"} |
|
await bot_instance.ban_chat_member(chat_id, user_id, until_date=until_date_val) |
|
return {"status": "User temporarily banned successfully"} |
|
elif action == "mute": |
|
await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions()) |
|
return {"status": "User muted successfully"} |
|
elif action == "unmute": |
|
await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions(can_send_messages=True)) |
|
return {"status": "User unmuted successfully"} |
|
elif action == "tmute": |
|
until_date_val = extract_time(time_val) |
|
if until_date_val is None: |
|
return {"error": "Invalid time type specified. Expected m, h, or d, Got it: {time_val[-1]}"} |
|
await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions(), until_date=until_date_val) |
|
return {"status": "User temporarily muted successfully"} |
|
elif action == "pin": |
|
if not message_id: |
|
return {"error": "Message ID is required for pin action"} |
|
await bot_instance.pin_chat_message(chat_id, message_id) |
|
return {"status": "Message pinned successfully"} |
|
elif action == "unpin": |
|
if not message_id: |
|
return {"error": "Message ID is required for unpin action"} |
|
await bot_instance.unpin_chat_message(chat_id, message_id) |
|
return {"status": "Message unpinned successfully"} |
|
elif action == "purge": |
|
if not message_id: |
|
return {"error": "Message ID is required for purge action"} |
|
await purge_messages(bot_instance, chat_id, message_id) |
|
return {"status": "Messages purged successfully"} |
|
else: |
|
return {"error": "Invalid action"} |
|
|
|
async def purge_messages(client, chat_id, message_id): |
|
try: |
|
message_ids = [] |
|
count_del_etion_s = 0 |
|
if message_id: |
|
for a_s_message_id in range(message_id, message_id + 100): |
|
message_ids.append(a_s_message_id) |
|
if len(message_ids) == 100: |
|
await client.delete_messages(chat_id=chat_id, message_ids=message_ids, revoke=True) |
|
count_del_etion_s += len(message_ids) |
|
message_ids = [] |
|
if len(message_ids) > 0: |
|
await client.delete_messages(chat_id=chat_id, message_ids=message_ids, revoke=True) |
|
count_del_etion_s += len(message_ids) |
|
except Exception as e: |
|
logger.error(f"Error purging messages: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/get_chat") |
|
async def get_chat_endpoint(chat_id: int): |
|
chat = await bot_instance.get_chat(chat_id) |
|
return {"chat": chat.dict()} |
|
|
|
@app.post("/get_chat_member") |
|
async def get_chat_member_endpoint(chat_id: int, user_id: int): |
|
chat_member = await bot_instance.get_chat_member(chat_id, user_id) |
|
return {"chat_member": chat_member.dict()} |
|
|
|
@app.post("/get_chat_members_count") |
|
async def get_chat_members_count_endpoint(chat_id: int): |
|
count = await bot_instance.get_chat_members_count(chat_id) |
|
return {"members_count": count} |
|
|
|
@app.post("/kick_chat_member") |
|
async def kick_chat_member_endpoint(chat_id: int, user_id: int): |
|
await bot_instance.kick_chat_member(chat_id, user_id) |
|
return {"status": "Chat member kicked successfully"} |
|
|
|
@app.post("/promote_chat_member") |
|
async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): |
|
await bot_instance.promote_chat_member( |
|
chat_id, |
|
user_id, |
|
can_change_info=can_change_info, |
|
can_post_messages=can_post_messages, |
|
can_edit_messages=can_edit_messages, |
|
can_delete_messages=can_delete_messages, |
|
can_manage_video_chats=can_manage_video_chats, |
|
can_restrict_members=can_restrict_members, |
|
can_promote_members=can_promote_members, |
|
can_invite_users=can_invite_users, |
|
can_pin_messages=can_pin_messages, |
|
can_manage_chat=can_manage_chat |
|
) |
|
return {"status": "Chat member promoted successfully"} |
|
|
|
@app.post("/set_chat_permissions") |
|
async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): |
|
chat_permissions = enums.ChatPermissions(**permissions) |
|
await bot_instance.set_chat_permissions(chat_id, chat_permissions) |
|
return {"status": "Chat permissions set successfully"} |
|
|
|
@app.post("/set_chat_photo") |
|
async def set_chat_photo_endpoint(chat_id: int, photo_url: str): |
|
response = requests.get(photo_url) |
|
photo = BytesIO(response.content) |
|
photo.name = "photo.jpg" |
|
await bot_instance.set_chat_photo(chat_id, photo) |
|
return {"status": "Chat photo set successfully"} |
|
|
|
@app.post("/delete_chat_photo") |
|
async def delete_chat_photo_endpoint(chat_id: int): |
|
await bot_instance.delete_chat_photo(chat_id) |
|
return {"status": "Chat photo deleted successfully"} |
|
|
|
@app.post("/set_chat_title") |
|
async def set_chat_title_endpoint(chat_id: int, title: str): |
|
await bot_instance.set_chat_title(chat_id, title) |
|
return {"status": "Chat title set successfully"} |
|
|
|
@app.post("/set_chat_description") |
|
async def set_chat_description_endpoint(chat_id: int, description: str): |
|
await bot_instance.set_chat_description(chat_id, description) |
|
return {"status": "Chat description set successfully"} |
|
|
|
@app.post("/pin_chat_message") |
|
async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): |
|
await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) |
|
return {"status": "Message pinned successfully"} |
|
|
|
@app.post("/unpin_chat_message") |
|
async def unpin_chat_message_endpoint(chat_id: int, message_id: int): |
|
await bot_instance.unpin_chat_message(chat_id, message_id) |
|
return {"status": "Message unpinned successfully"} |
|
|
|
@app.post("/unpin_all_chat_messages") |
|
async def unpin_all_chat_messages_endpoint(chat_id: int): |
|
await bot_instance.unpin_all_chat_messages(chat_id) |
|
return {"status": "All messages unpinned successfully"} |
|
|
|
@app.post("/answer_callback_query") |
|
async def answer_callback_query_endpoint(callback_query_id: str, text: str = None, show_alert: bool = False, url: str = None, cache_time: int = 0): |
|
await bot_instance.answer_callback_query(callback_query_id, text=text, show_alert=show_alert, url=url, cache_time=cache_time) |
|
return {"status": "Callback query answered successfully"} |
|
|
|
@app.post("/copy_message") |
|
async def copy_message_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.copy_message( |
|
chat_id, |
|
from_chat_id, |
|
message_id, |
|
caption=caption, |
|
parse_mode=parse_mode, |
|
caption_entities=caption_entities, |
|
reply_to_message_id=reply_to_message_id, |
|
allow_sending_without_reply=allow_sending_without_reply, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
|
|
@app.post("/forward_messages") |
|
async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): |
|
messages = await bot_instance.forward_messages( |
|
chat_id, |
|
from_chat_id, |
|
message_ids, |
|
disable_notification=disable_notification |
|
) |
|
return {"messages": [msg.id for msg in messages]} |
|
|
|
@app.post("/send_poll") |
|
async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: None = None, open_period: int = None, close_date: int = None, is_closed: bool = False): |
|
try: |
|
message = await bot_instance.send_poll( |
|
chat_id, |
|
question, |
|
options, |
|
is_anonymous=is_anonymous, |
|
type=type, |
|
allows_multiple_answers=allows_multiple_answers, |
|
correct_option_id=correct_option_id, |
|
explanation=explanation, |
|
explanation_parse_mode=explanation_parse_mode, |
|
open_period=open_period, |
|
close_date=close_date, |
|
is_closed=is_closed |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending poll: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_dice") |
|
async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_dice( |
|
chat_id, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending dice: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_game") |
|
async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_game( |
|
chat_id, |
|
game_short_name, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending game: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_media_group") |
|
async def send_media_group_endpoint(chat_id: int, media: list, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
media_group = [] |
|
for media_item in media: |
|
if media_item["type"] == "photo": |
|
response = requests.get(media_item["media"]) |
|
photo = BytesIO(response.content) |
|
photo.name = "photo.jpg" |
|
media_group.append(types.InputMediaPhoto(media=photo)) |
|
elif media_item["type"] == "video": |
|
response = requests.get(media_item["media"]) |
|
video = BytesIO(response.content) |
|
video.name = "video.mp4" |
|
media_group.append(types.InputMediaVideo(media=video)) |
|
else: |
|
return {"error": "Unsupported media type"} |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
messages = await bot_instance.send_media_group( |
|
chat_id, |
|
media_group, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"messages": [msg.id for msg in messages]} |
|
except Exception as e: |
|
logger.error(f"Error sending media group: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_invoice") |
|
async def send_invoice_endpoint(chat_id: int, title: str, description: str, payload: str, provider_token: str, currency: str, prices: list, start_parameter: str = None, provider_data: str = None, photo_url: str = None, photo_size: int = None, photo_width: int = None, photo_height: int = None, need_name: bool = False, need_phone_number: bool = False, need_email: bool = False, need_shipping_address: bool = False, send_phone_number_to_provider: bool = False, send_email_to_provider: bool = False, is_flexible: bool = False, disable_notification: bool = False, protect_content: bool = False, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): |
|
try: |
|
prices_obj = [types.LabeledPrice(label=price["label"], amount=price["amount"]) for price in prices] |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_invoice( |
|
chat_id, |
|
title, |
|
description, |
|
payload, |
|
provider_token, |
|
currency, |
|
prices_obj, |
|
start_parameter=start_parameter, |
|
provider_data=provider_data, |
|
photo_url=photo_url, |
|
photo_size=photo_size, |
|
photo_width=photo_width, |
|
photo_height=photo_height, |
|
need_name=need_name, |
|
need_phone_number=need_phone_number, |
|
need_email=need_email, |
|
need_shipping_address=need_shipping_address, |
|
send_phone_number_to_provider=send_phone_number_to_provider, |
|
send_email_to_provider=send_email_to_provider, |
|
is_flexible=is_flexible, |
|
disable_notification=disable_notification, |
|
protect_content=protect_content, |
|
reply_to_message_id=reply_to_message_id, |
|
allow_sending_without_reply=allow_sending_without_reply, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending invoice: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_location") |
|
async def send_location_endpoint(chat_id: int, latitude: float, longitude: float, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_location( |
|
chat_id, |
|
latitude, |
|
longitude, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending location: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_contact") |
|
async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_contact( |
|
chat_id, |
|
phone_number, |
|
first_name, |
|
last_name=last_name, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending contact: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_animation") |
|
async def send_animation_endpoint(chat_id: int, animation_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(animation_url) |
|
animation = BytesIO(response.content) |
|
animation.name = "animation.gif" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_animation( |
|
chat_id, |
|
animation, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending animation: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_video_note") |
|
async def send_video_note_endpoint(chat_id: int, video_note_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(video_note_url) |
|
video_note = BytesIO(response.content) |
|
video_note.name = "video_note.mp4" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_video_note( |
|
chat_id, |
|
video_note, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending video note: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_voice") |
|
async def send_voice_endpoint(chat_id: int, voice_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(voice_url) |
|
voice = BytesIO(response.content) |
|
voice.name = "voice.ogg" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_voice( |
|
chat_id, |
|
voice, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending voice: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_video") |
|
async def send_video_endpoint(chat_id: int, video_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(video_url) |
|
video = BytesIO(response.content) |
|
video.name = "video.mp4" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_video( |
|
chat_id, |
|
video, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending video: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_document") |
|
async def send_document_endpoint(chat_id: int, document_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(document_url) |
|
document = BytesIO(response.content) |
|
document.name = "document.pdf" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_document( |
|
chat_id, |
|
document, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending document: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_photo") |
|
async def send_photo_endpoint(chat_id: int, photo_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(photo_url) |
|
photo = BytesIO(response.content) |
|
photo.name = "photo.jpg" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_photo( |
|
chat_id, |
|
photo, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending photo: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_audio") |
|
async def send_audio_endpoint(chat_id: int, audio_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(audio_url) |
|
audio = BytesIO(response.content) |
|
audio.name = "audio.mp3" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_audio( |
|
chat_id, |
|
audio, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending audio: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_sticker") |
|
async def send_sticker_endpoint(chat_id: int, sticker_url: str, reply_to_message_id: int = None, reply_markup: dict = None): |
|
try: |
|
response = requests.get(sticker_url) |
|
sticker = BytesIO(response.content) |
|
sticker.name = "sticker.webp" |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.send_sticker( |
|
chat_id, |
|
sticker, |
|
reply_to_message_id=reply_to_message_id, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending sticker: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_chat_action") |
|
async def send_chat_action_endpoint(chat_id: int, action: str): |
|
actions = { |
|
"typing": enums.ChatAction.TYPING, |
|
"upload_photo": enums.ChatAction.UPLOAD_PHOTO, |
|
"record_video": enums.ChatAction.RECORD_VIDEO, |
|
"upload_video": enums.ChatAction.UPLOAD_VIDEO, |
|
"record_voice": enums.ChatAction.RECORD_VOICE, |
|
"upload_voice": enums.ChatAction.UPLOAD_VOICE, |
|
"upload_document": enums.ChatAction.UPLOAD_DOCUMENT, |
|
"find_location": enums.ChatAction.FIND_LOCATION, |
|
"record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, |
|
"upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, |
|
"playing": enums.ChatAction.PLAYING |
|
} |
|
action_enum = actions.get(action) |
|
if not action_enum: |
|
return {"error": "Invalid action"} |
|
try: |
|
await bot_instance.send_chat_action(chat_id, action_enum) |
|
return {"status": "Chat action sent successfully"} |
|
except Exception as e: |
|
logger.error(f"Error sending chat action: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/create_chat_invite_link") |
|
async def create_chat_invite_link_endpoint(chat_id: int, name: str = None, expire_date: int = None, member_limit: int = None, creates_join_request: bool = False): |
|
try: |
|
invite_link = await bot_instance.create_chat_invite_link( |
|
chat_id, |
|
name=name, |
|
expire_date=expire_date, |
|
member_limit=member_limit, |
|
creates_join_request=creates_join_request |
|
) |
|
return {"invite_link": invite_link.invite_link} |
|
except Exception as e: |
|
logger.error(f"Error creating chat invite link: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_photo") |
|
async def set_chat_photo_endpoint(chat_id: int, photo_url: str): |
|
try: |
|
response = requests.get(photo_url) |
|
photo = BytesIO(response.content) |
|
photo.name = "photo.jpg" |
|
await bot_instance.set_chat_photo(chat_id, photo) |
|
return {"status": "Chat photo set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat photo: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/delete_chat_photo") |
|
async def delete_chat_photo_endpoint(chat_id: int): |
|
try: |
|
await bot_instance.delete_chat_photo(chat_id) |
|
return {"status": "Chat photo deleted successfully"} |
|
except Exception as e: |
|
logger.error(f"Error deleting chat photo: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_title") |
|
async def set_chat_title_endpoint(chat_id: int, title: str): |
|
try: |
|
await bot_instance.set_chat_title(chat_id, title) |
|
return {"status": "Chat title set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat title: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_description") |
|
async def set_chat_description_endpoint(chat_id: int, description: str): |
|
try: |
|
await bot_instance.set_chat_description(chat_id, description) |
|
return {"status": "Chat description set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat description: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/pin_chat_message") |
|
async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): |
|
try: |
|
await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) |
|
return {"status": "Message pinned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error pinning chat message: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/unpin_chat_message") |
|
async def unpin_chat_message_endpoint(chat_id: int, message_id: int): |
|
try: |
|
await bot_instance.unpin_chat_message(chat_id, message_id) |
|
return {"status": "Message unpinned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error unpinning chat message: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/unpin_all_chat_messages") |
|
async def unpin_all_chat_messages_endpoint(chat_id: int): |
|
try: |
|
await bot_instance.unpin_all_chat_messages(chat_id) |
|
return {"status": "All messages unpinned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error unpinning all chat messages: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/restrict_chat_member") |
|
async def restrict_chat_member_endpoint(chat_id: int, user_id: int, permissions: dict, until_date: int = None): |
|
try: |
|
chat_permissions = enums.ChatPermissions(**permissions) |
|
await bot_instance.restrict_chat_member(chat_id, user_id, chat_permissions, until_date=until_date) |
|
return {"status": "Chat member restricted successfully"} |
|
except Exception as e: |
|
logger.error(f"Error restricting chat member: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/promote_chat_member") |
|
async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): |
|
try: |
|
await bot_instance.promote_chat_member( |
|
chat_id, |
|
user_id, |
|
can_change_info=can_change_info, |
|
can_post_messages=can_post_messages, |
|
can_edit_messages=can_edit_messages, |
|
can_delete_messages=can_delete_messages, |
|
can_manage_video_chats=can_manage_video_chats, |
|
can_restrict_members=can_restrict_members, |
|
can_promote_members=can_promote_members, |
|
can_invite_users=can_invite_users, |
|
can_pin_messages=can_pin_messages, |
|
can_manage_chat=can_manage_chat |
|
) |
|
return {"status": "Chat member promoted successfully"} |
|
except Exception as e: |
|
logger.error(f"Error promoting chat member: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/kick_chat_member") |
|
async def kick_chat_member_endpoint(chat_id: int, user_id: int): |
|
try: |
|
await bot_instance.kick_chat_member(chat_id, user_id) |
|
return {"status": "Chat member kicked successfully"} |
|
except Exception as e: |
|
logger.error(f"Error kicking chat member: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/unban_chat_member") |
|
async def unban_chat_member_endpoint(chat_id: int, user_id: int): |
|
try: |
|
await bot_instance.unban_chat_member(chat_id, user_id) |
|
return {"status": "Chat member unbanned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error unbanning chat member: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_administrator_custom_title") |
|
async def set_chat_administrator_custom_title_endpoint(chat_id: int, user_id: int, custom_title: str): |
|
try: |
|
await bot_instance.set_chat_administrator_custom_title(chat_id, user_id, custom_title) |
|
return {"status": "Custom title set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting custom title: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_permissions") |
|
async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): |
|
try: |
|
chat_permissions = enums.ChatPermissions(**permissions) |
|
await bot_instance.set_chat_permissions(chat_id, chat_permissions) |
|
return {"status": "Chat permissions set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat permissions: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_sticker_set") |
|
async def set_chat_sticker_set_endpoint(chat_id: int, sticker_set_name: str): |
|
try: |
|
await bot_instance.set_chat_sticker_set(chat_id, sticker_set_name) |
|
return {"status": "Chat sticker set set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat sticker set: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/delete_chat_sticker_set") |
|
async def delete_chat_sticker_set_endpoint(chat_id: int): |
|
try: |
|
await bot_instance.delete_chat_sticker_set(chat_id) |
|
return {"status": "Chat sticker set deleted successfully"} |
|
except Exception as e: |
|
logger.error(f"Error deleting chat sticker set: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/approve_chat_join_request") |
|
async def approve_chat_join_request_endpoint(chat_id: int, user_id: int): |
|
try: |
|
await bot_instance.approve_chat_join_request(chat_id, user_id) |
|
return {"status": "Join request approved successfully"} |
|
except Exception as e: |
|
logger.error(f"Error approving chat join request: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/decline_chat_join_request") |
|
async def decline_chat_join_request_endpoint(chat_id: int, user_id: int): |
|
try: |
|
await bot_instance.decline_chat_join_request(chat_id, user_id) |
|
return {"status": "Join request declined successfully"} |
|
except Exception as e: |
|
logger.error(f"Error declining chat join request: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/ban_chat_sender_chat") |
|
async def ban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): |
|
try: |
|
await bot_instance.ban_chat_sender_chat(chat_id, sender_chat_id) |
|
return {"status": "Sender chat banned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error banning chat sender chat: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/unban_chat_sender_chat") |
|
async def unban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): |
|
try: |
|
await bot_instance.unban_chat_sender_chat(chat_id, sender_chat_id) |
|
return {"status": "Sender chat unbanned successfully"} |
|
except Exception as e: |
|
logger.error(f"Error unbanning chat sender chat: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_chat_menu_button") |
|
async def set_chat_menu_button_endpoint(chat_id: int, menu_button: dict = None): |
|
try: |
|
if menu_button: |
|
menu_button_obj = types.MenuButton(**menu_button) |
|
else: |
|
menu_button_obj = None |
|
await bot_instance.set_chat_menu_button(chat_id, menu_button_obj) |
|
return {"status": "Chat menu button set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting chat menu button: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/get_chat_menu_button") |
|
async def get_chat_menu_button_endpoint(chat_id: int): |
|
try: |
|
menu_button = await bot_instance.get_chat_menu_button(chat_id) |
|
return {"menu_button": menu_button.dict() if menu_button else None} |
|
except Exception as e: |
|
logger.error(f"Error getting chat menu button: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/set_my_commands") |
|
async def set_my_commands_endpoint(commands: list): |
|
try: |
|
bot_commands = [types.BotCommand(command=cmd["command"], description=cmd["description"]) for cmd in commands] |
|
await bot_instance.set_my_commands(bot_commands) |
|
return {"status": "Commands set successfully"} |
|
except Exception as e: |
|
logger.error(f"Error setting my commands: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/get_my_commands") |
|
async def get_my_commands_endpoint(): |
|
try: |
|
commands = await bot_instance.get_my_commands() |
|
return {"commands": [cmd.dict() for cmd in commands]} |
|
except Exception as e: |
|
logger.error(f"Error getting my commands: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/send_copy") |
|
async def send_copy_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): |
|
try: |
|
reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None |
|
message = await bot_instance.copy_message( |
|
chat_id, |
|
from_chat_id, |
|
message_id, |
|
caption=caption, |
|
parse_mode=parse_mode, |
|
caption_entities=caption_entities, |
|
reply_to_message_id=reply_to_message_id, |
|
allow_sending_without_reply=allow_sending_without_reply, |
|
reply_markup=reply_markup_obj |
|
) |
|
return {"message_id": message.id} |
|
except Exception as e: |
|
logger.error(f"Error sending copy: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/forward_messages") |
|
async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): |
|
try: |
|
messages = await bot_instance.forward_messages( |
|
chat_id, |
|
from_chat_id, |
|
message_ids, |
|
disable_notification=disable_notification |
|
) |
|
return {"messages": [msg.id for msg in messages]} |
|
except Exception as e: |
|
logger.error(f"Error forwarding messages: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/text_to_speech") |
|
async def text_to_speech_endpoint(tts_request: TextToSpeechRequest): |
|
try: |
|
audio = await convert(tts_request.text) |
|
return {"audio_url": audio.name} |
|
except Exception as e: |
|
logger.error(f"Error converting text to speech: {e}") |
|
return {"error": str(e)} |
|
|
|
async def convert(text): |
|
audio = BytesIO() |
|
i = Translator().translate(text, dest="en") |
|
lang = i.src |
|
tts = gTTS(text, lang=lang) |
|
audio.name = lang + ".mp3" |
|
tts.write_to_fp(audio) |
|
return audio |
|
|
|
@app.post("/download_youtube_song") |
|
async def download_youtube_song_endpoint(yt_request: YouTubeDownloadRequest): |
|
try: |
|
results = YoutubeSearch(yt_request.url, max_results=1).to_dict() |
|
link = f"https://youtube.com{results[0]['url_suffix']}" |
|
title = results[0]["title"][:40] |
|
thumbnail = results[0]["thumbnails"][0] |
|
thumb_name = f'thumb{title}.jpg' |
|
thumb = requests.get(thumbnail, allow_redirects=True) |
|
open(thumb_name, 'wb').write(thumb.content) |
|
performer = f"[Mα΄Ι΄ Bα΄α΄α΄’β’]" |
|
duration = results[0]["duration"] |
|
url_suffix = results[0]["url_suffix"] |
|
views = results[0]["views"] |
|
except Exception as e: |
|
logger.error(f"Error downloading YouTube song: {e}") |
|
return {"error": str(e)} |
|
|
|
ydl_opts = {"format": "bestaudio[ext=m4a]"} |
|
try: |
|
with YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(link, download=False) |
|
audio_file = ydl.prepare_filename(info_dict) |
|
ydl.process_info(info_dict) |
|
except Exception as e: |
|
logger.error(f"Error processing YouTube song: {e}") |
|
return {"error": str(e)} |
|
|
|
cap = "**BYβΊβΊ [Mα΄Ι΄ Bα΄α΄α΄’β’](https://t.me/mkn_bots_updates)**" |
|
secmul, dur, dur_arr = 1, 0, duration.split(':') |
|
for i in range(len(dur_arr) - 1, -1, -1): |
|
dur += (int(dur_arr[i]) * secmul) |
|
secmul *= 60 |
|
|
|
return {"audio_file": audio_file, "caption": cap, "title": title, "duration": dur, "performer": performer, "thumb": thumb_name} |
|
|
|
@app.post("/download_youtube_video") |
|
async def download_youtube_video_endpoint(yt_request: YouTubeDownloadRequest): |
|
try: |
|
url = yt_request.url |
|
search = SearchVideos(f"{url}", offset=1, mode="dict", max_results=1) |
|
mi = search.result() |
|
mio = mi["search_result"] |
|
mo = mio[0]["link"] |
|
thum = mio[0]["title"] |
|
fridayz = mio[0]["id"] |
|
mio[0]["channel"] |
|
kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" |
|
await asyncio.sleep(0.6) |
|
url = mo |
|
sedlyf = wget.download(kekme) |
|
opts = { |
|
"format": "best", |
|
"addmetadata": True, |
|
"key": "FFmpegMetadata", |
|
"prefer_ffmpeg": True, |
|
"geo_bypass": True, |
|
"nocheckcertificate": True, |
|
"postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}], |
|
"outtmpl": "%(id)s.mp4", |
|
"logtostderr": False, |
|
"quiet": True, |
|
} |
|
with YoutubeDL(opts) as ytdl: |
|
ytdl_data = ytdl.extract_info(url, download=True) |
|
except Exception as e: |
|
logger.error(f"Error downloading YouTube video: {e}") |
|
return {"error": str(e)} |
|
|
|
file_stark = f"{ytdl_data['id']}.mp4" |
|
capy = f"""**ππΈππ»π΄ :** [{thum}]({mo})\n**ππ΄πππ΄πππ΄π³ π±π :** {await bot_instance.get_me().mention}""" |
|
|
|
return {"video_file": file_stark, "caption": capy, "title": ytdl_data["title"], "duration": int(ytdl_data["duration"]), "thumb": sedlyf} |
|
|
|
@app.post("/carbon") |
|
async def carbon_endpoint(carbon_request: CarbonRequest): |
|
try: |
|
carbon_image = await make_carbon(carbon_request.text, True) |
|
return {"carbon_url": carbon_image} |
|
except Exception as e: |
|
logger.error(f"Error generating carbon image: {e}") |
|
return {"error": str(e)} |
|
|
|
async def make_carbon(text, tele=False): |
|
url = "https://carbonara.solopov.dev/api/cook" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(url, json={"code": text}) as resp: |
|
image = BytesIO(await resp.read()) |
|
image.name = "carbon.png" |
|
if tele: |
|
uf = upload_file(image) |
|
image.close() |
|
return f"https://graph.org{uf[0]}" |
|
return image |
|
|
|
@app.post("/font") |
|
async def font_endpoint(font_request: FontRequest): |
|
fonts = { |
|
"typewriter": Fonts.typewriter, |
|
"outline": Fonts.outline, |
|
"serif": Fonts.serief, |
|
"bold_cool": Fonts.bold_cool, |
|
"cool": Fonts.cool, |
|
"small_cap": Fonts.smallcap, |
|
"script": Fonts.script, |
|
"bold_script": Fonts.bold_script, |
|
"tiny": Fonts.tiny, |
|
"comic": Fonts.comic, |
|
"san": Fonts.san, |
|
"slant_san": Fonts.slant_san, |
|
"slant": Fonts.slant, |
|
"sim": Fonts.sim, |
|
"circles": Fonts.circles, |
|
"dark_circle": Fonts.dark_circle, |
|
"gothic": Fonts.gothic, |
|
"bold_gothic": Fonts.bold_gothic, |
|
"cloud": Fonts.cloud, |
|
"happy": Fonts.happy, |
|
"sad": Fonts.sad, |
|
"special": Fonts.special, |
|
"square": Fonts.square, |
|
"dark_square": Fonts.dark_square, |
|
"andalucia": Fonts.andalucia, |
|
"manga": Fonts.manga, |
|
"stinky": Fonts.stinky, |
|
"bubbles": Fonts.bubbles, |
|
"underline": Fonts.underline, |
|
"ladybug": Fonts.ladybug, |
|
"rays": Fonts.rays, |
|
"birds": Fonts.birds, |
|
"slash": Fonts.slash, |
|
"stop": Fonts.stop, |
|
"skyline": Fonts.skyline, |
|
"arrows": Fonts.arrows, |
|
"rvnes": Fonts.rvnes, |
|
"strike": Fonts.strike, |
|
"frozen": Fonts.frozen |
|
} |
|
cls = fonts.get(font_request.style) |
|
if not cls: |
|
return {"error": "Invalid style"} |
|
try: |
|
new_text = cls(font_request.text) |
|
return {"new_text": new_text} |
|
except Exception as e: |
|
logger.error(f"Error applying font style: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/paste") |
|
async def paste_endpoint(paste_request: PasteRequest): |
|
try: |
|
paste_response = await p_paste(paste_request.text) |
|
return paste_response |
|
except Exception as e: |
|
logger.error(f"Error pasting text: {e}") |
|
return {"error": str(e)} |
|
|
|
async def p_paste(code, extension=None): |
|
siteurl = "https://pasty.lus.pm/api/v1/pastes" |
|
data = {"content": code} |
|
try: |
|
response = requests.post(url=siteurl, data=json.dumps(data), headers={"User-Agent": "Mozilla/5.0", "content-type": "application/json"}) |
|
except Exception as e: |
|
return {"error": str(e)} |
|
if response.ok: |
|
response = response.json() |
|
purl = ( |
|
f"https://pasty.lus.pm/{response['id']}.{extension}" |
|
if extension |
|
else f"https://pasty.lus.pm/{response['id']}.txt" |
|
) |
|
return { |
|
"url": purl, |
|
"raw": f"https://pasty.lus.pm/{response['id']}/raw", |
|
"bin": "Pasty", |
|
} |
|
return {"error": "Unable to reach pasty.lus.pm"} |
|
|
|
@app.post("/share_text") |
|
async def share_text_endpoint(share_request: ShareTextRequest): |
|
try: |
|
share_url = f"https://t.me/share/url?url={quote(share_request.text)}" |
|
return {"share_url": share_url} |
|
except Exception as e: |
|
logger.error(f"Error sharing text: {e}") |
|
return {"error": str(e)} |
|
|
|
@app.post("/telegraph_upload") |
|
async def telegraph_upload_endpoint(upload_request: TelegraphUploadRequest): |
|
try: |
|
response = upload_file(upload_request.file_path) |
|
telegraph_url = f"https://graph.org{response[0]}" |
|
return {"telegraph_url": telegraph_url} |
|
except Exception as e: |
|
logger.error(f"Error uploading to Telegraph: {e}") |
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |