import logging from pyrogram import Client, filters, idle from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton from pyrogram.types import * from RyuzakiLib import Tiktok from config import TIKTOK_WEB as tt, API_ID, API_HASH, BOT_TOKEN, HUGGING_TOKEN import hashlib try: from aiohttp import ClientSession as aiohttp_client except ImportError: aiohttp_client = None logging.getLogger("pyrogram").setLevel(logging.WARNING) logging.basicConfig(level=logging.INFO) WELCOME_TEXT = """ Halo {} Saya adalah bot untuk mengunduh video tiktok di telegram. Saya dapat mengunduh video dengan tanda air atau tanpa tanda air dan mengunduh audio dari url. Kirimkan saja saya url tiktok. """ client = Client( "TTK-BOT", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN ) link_storage = {} def generate_callback_data(user_id, query): identifier = hashlib.md5(query.encode()).hexdigest() callback_data = f"audiodownload_{user_id}_{identifier}" link_storage[callback_data] = query return callback_data async def async_searcher( url: str, post: bool = False, head: bool = False, headers: dict = None, evaluate=None, object: bool = False, re_json: bool = False, re_content: bool = False, *args, **kwargs, ): if aiohttp_client: async with aiohttp_client(headers=headers) as client: method = client.head if head else (client.post if post else client.get) data = await method(url, *args, **kwargs) if evaluate: return await evaluate(data) if re_json: return await data.json() if re_content: return await data.read() if head or object: return data return await data.text() else: raise DependencyMissingError("install 'aiohttp' to use this.") @client.on_message(filters.command("start") & filters.private) async def welcome_start(client: Client, message: Message): keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="📢 Saluran Bot", url="https://t.me/RendyProjects" ) ] ] ) await message.reply_text( WELCOME_TEXT.format(message.from_user.first_name), reply_markup=keyboard ) @client.on_callback_query(filters.regex("^audiodownload_")) async def callback_button(client: Client, cb: CallbackQuery): try: data = cb.data user_id = cb.from_user.id query = link_storage.get(data) if query: response = Tiktok.download(tt, query) await client.send_audio(user_id, response[1]) await cb.answer("Audio sent successfully!") else: await cb.answer("Invalid or expired link.", show_alert=True) except Exception as e: await cb.answer(f"Error: {str(e)}", show_alert=True) @client.on_message(filters.text & filters.private) async def tiktok_downloader(client: Client, message: Message): if message.text: query = message.text callback_data = generate_callback_data(message.from_user.id, query) keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="Audio Download", callback_data=callback_data ) ] ] ) try: dll = await message.reply_text("Processing....") await message.delete() response = Tiktok.download(tt, query) await message.reply_video(response[0], reply_markup=keyboard) await dll.delete() except Exception as e: await dll.delete() await message.reply_text(f"Error: {str(e)}") async def setup(): try: await client.start() logger.info("Bot started successfully") await start_periodic_task() # Run the client await idle() except Exception as e: logger.error(f"Error in setup: {e}") client.run()