import os import time import json import asyncio import io import re import logging import string import random import requests from pyrogram import Client, filters from pyrogram.types import * from pyrogram.enums import * from pyrogram.errors import * from PIL import Image, ImageDraw, ImageFont, ImageFilter logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) captcha_texts = {} captcha_modes = {} NOT_ALLOWED_NON_PROGRAMMER = [ 466019692, # @myexcid, 1423479724, # tonic, 883761960, # ari 6824458358, # None 1982318761, # paman 5575183435, #suku 948247711, # akay ] def generate_math_captcha(): operations = ['+', '-', '*'] operation = random.choice(operations) num1 = random.randint(1, 20) num2 = random.randint(1, 20) if operation == '+': correct_answer = num1 + num2 elif operation == '-': correct_answer = num1 - num2 else: correct_answer = num1 * num2 captcha_text = f"{num1} {operation} {num2} = ?" choices = [str(correct_answer)] while len(choices) < 3: wrong_answer = correct_answer + random.choice([-5, -3, -2, 2, 3, 5]) if str(wrong_answer) not in choices: choices.append(str(wrong_answer)) random.shuffle(choices) width, height = 250, 100 background_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) # Warna pastel img = Image.new('RGB', (width, height), color=background_color) d = ImageDraw.Draw(img) for _ in range(1000): x = random.randint(0, width) y = random.randint(0, height) noise_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)) d.point((x, y), fill=noise_color) try: font = ImageFont.truetype("arial.ttf", 40) except IOError: font = ImageFont.load_default() text_width, text_height = d.textsize(captcha_text, font=font) text_x = (width - text_width) / 2 text_y = (height - text_height) / 2 - 20 text_image = Image.new('RGBA', (text_width, text_height), (255, 255, 255, 0)) text_draw = ImageDraw.Draw(text_image) text_draw.text((0, 0), captcha_text, font=font, fill=(0, 0, 0)) rotated_text = text_image.rotate(random.randint(-25, 25), expand=1) img.paste(rotated_text, (int(text_x), int(text_y)), rotated_text) choice_font = ImageFont.truetype("arial.ttf", 30) if font else ImageFont.load_default() for idx, choice in enumerate(choices): choice_text = f"{idx + 1}. {choice}" choice_width, choice_height = d.textsize(choice_text, font=choice_font) choice_x = 50 + idx * 80 choice_y = height - choice_height - 20 d.text((choice_x, choice_y), choice_text, font=choice_font, fill=(0, 0, 0)) for _ in range(5): start = (random.randint(0, width), random.randint(0, height)) end = (random.randint(0, width), random.randint(0, height)) line_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) d.line([start, end], fill=line_color, width=2) img = img.filter(ImageFilter.BLUR) img_path = f"captcha_math_{captcha_text}.png" img.save(img_path) return captcha_text, img_path, choices, correct_answer def generate_text_captcha(): letters = string.ascii_uppercase + string.digits captcha_text = ''.join(random.choice(letters) for _ in range(5)) choices = [captcha_text] while len(choices) < 3: wrong_choice = ''.join(random.choice(letters) for _ in range(5)) if wrong_choice not in choices: choices.append(wrong_choice) random.shuffle(choices) width, height = 200, 80 background_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) # Warna pastel img = Image.new('RGB', (width, height), color=background_color) d = ImageDraw.Draw(img) for _ in range(500): x = random.randint(0, width) y = random.randint(0, height) noise_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)) d.point((x, y), fill=noise_color) try: font = ImageFont.truetype("arial.ttf", 45) except IOError: font = ImageFont.load_default() text_width, text_height = d.textsize(captcha_text, font=font) text_x = (width - text_width) / 2 text_y = (height - text_height) / 2 text_image = Image.new('RGBA', (text_width, text_height), (255, 255, 255, 0)) text_draw = ImageDraw.Draw(text_image) text_draw.text((0, 0), captcha_text, font=font, fill=(0, 0, 0)) rotated_text = text_image.rotate(random.randint(-25, 25), expand=1) img.paste(rotated_text, (int(text_x), int(text_y)), rotated_text) choice_font = ImageFont.truetype("arial.ttf", 30) if font else ImageFont.load_default() for idx, choice in enumerate(choices): choice_text = f"{idx + 1}. {choice}" choice_width, choice_height = d.textsize(choice_text, font=choice_font) choice_x = 50 + idx * 80 choice_y = height - choice_height - 20 d.text((choice_x, choice_y), choice_text, font=choice_font, fill=(0, 0, 0)) for _ in range(5): start = (random.randint(0, width), random.randint(0, height)) end = (random.randint(0, width), random.randint(0, height)) line_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) d.line([start, end], fill=line_color, width=2) img = img.filter(ImageFilter.BLUR) img_path = f"captcha_text_{captcha_text}.png" img.save(img_path) return captcha_text, img_path, choices, captcha_text def generate_captcha(user_id, mode='math'): if mode == 'math': return generate_math_captcha() else: return generate_text_captcha() def thanks_hacker_by_randydev(): url = "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcR0u8UqJ2JDhfmPOCb_zAHjUQG2NYMjTwLbkq_sQhCQOxX8hn66YbaGFvLL&s=10" response = requests.get(url) image_hacker = "hacker.png" with open(image_hacker, "wb") as f: f.write(response.content) return image_hacker def failed_hacker_by_randydev(): url = "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTM0vb4s59H9F1-_7FyELiLU04e8bCHy7o6KQV2mG3DFRLnzP547KjckKG2&s=10" response = requests.get(url) image_hacker = "failed_hacker.png" with open(image_hacker, "wb") as f: f.write(response.content) return image_hacker async def remove_captcha_after_timeout(client, user_id, delay=300): await asyncio.sleep(delay) if user_id in captcha_texts: captcha_data = captcha_texts.get(user_id) chat_id = captcha_data['chat_id'] await client.decline_chat_join_request( chat_id=chat_id, user_id=user_id ) await client.send_message(user_id, "ā° Your CAPTCHA verification has expired. Please try again.") del captcha_texts[user_id] logger.info(f"CAPTCHA for user {user_id} has expired and been removed.") @Client.on_message(filters.command("settingmode") & filters.private) async def setting_mode(client: Client, message: Message): if message.from_user.id in NOT_ALLOWED_NON_PROGRAMMER: return user_id = message.from_user.id keyboard = InlineKeyboardMarkup( [ [InlineKeyboardButton("š¢ Mathematics", callback_data="mode_math")], [InlineKeyboardButton("š¤ Text", callback_data="mode_text")], [InlineKeyboardButton("ā Cancel", callback_data="cancel_mode")] ] ) await message.reply_text( "š Select the CAPTCHA mode you want:", reply_markup=keyboard ) @Client.on_callback_query(filters.regex("^mode_")) async def mode_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id data = cb.data.split("_") if len(data) != 2: await cb.answer("ā Invalid selection.", show_alert=True) return mode = data[1] if mode not in ['math', 'text']: await cb.answer("ā Invalid mode.", show_alert=True) return captcha_modes[user_id] = mode keyboard = InlineKeyboardMarkup( [ [InlineKeyboardButton("š¢ Mathematics", callback_data="mode_math")], [InlineKeyboardButton("š¤ Text", callback_data="mode_text")], [InlineKeyboardButton("ā Cancel", callback_data="cancel_mode")] ] ) await cb.edit_message_text( f"ā CAPTCHA mode has been set to **{'Mathematics' if mode == 'math' else 'Text'}**.", reply_markup=keyboard ) await cb.answer("The mode has been changed.", show_alert=False) logger.info(f"User {user_id} set CAPTCHA mode to {mode}.") @Client.on_callback_query(filters.regex("^cancel_mode$")) async def cancel_mode_callback(client: Client, cb: CallbackQuery): await cb.edit_message_text("ā CAPTCHA mode setting has been canceled.") await cb.answer("Cancellation successful.", show_alert=False) logger.info(f"User {cb.from_user.id} canceled CAPTCHA mode setting.") @Client.on_chat_join_request(filters.chat("KillerXSupport")) async def join_request(client: Client, event: ChatJoinRequest): member = await client.get_chat_member(event.chat.id, "me") if member.status != ChatMemberStatus.ADMINISTRATOR: return await client.send_message(event.chat.id, text="I am not an administrator in this group.") try: chat_link = await client.export_chat_invite_link(event.chat.id) except ChatAdminRequired: await client.send_message(event.chat.id, text="I need to be an administrator to perform this action.") return mode = captcha_modes.get(event.from_user.id, "text") captcha_text, img_path, choices, correct_answer = generate_captcha(event.from_user.id, mode) captcha_texts[event.from_user.id] = { "captcha_text": captcha_text, "correct_answer": correct_answer, "chat_id": event.chat.id, "chat_link": chat_link, "first_name": event.from_user.first_name } buttons = [ [InlineKeyboardButton(choice, callback_data=f"verify_{event.from_user.id}_{choice}")] for choice in choices ] buttons.append([ InlineKeyboardButton("š Refresh CAPTCHA", callback_data="refresh_captcha"), InlineKeyboardButton("ā Cancel", callback_data="cancel_captcha") ]) keyboard = InlineKeyboardMarkup(buttons) if event.chat.type == ChatType.SUPERGROUP: try: await client.send_message( event.chat.id, text=f" š¦ Verify that you {event.from_user.first_name} are human!", reply_markup=create_button_userinfo(event.from_user.id, client.me.username) ) await client.send_photo( event.from_user.id, photo=img_path, caption=f"āļø **Verify that you are human!**\n\nā Please select the correct CAPTCHA text shown in the image below.", reply_markup=keyboard ) os.remove(img_path) asyncio.create_task(remove_captcha_after_timeout(client, event.from_user.id)) except Exception as e: await client.send_message( event.chat.id, text=str(e) ) logger.error(str(e)) @Client.on_callback_query(filters.regex("^verify_")) async def verify_captcha_callback(client: Client, cb: CallbackQuery): data = cb.data.split("_") if len(data) != 3: await cb.answer("ā Format data tidak valid.", show_alert=True) return _, user_id_str, user_choice = data try: user_id = int(user_id_str) except ValueError: await cb.answer("ā Invalid user ID.", show_alert=True) return if cb.from_user.id != user_id: await cb.answer("ā You have no right to do this.", show_alert=True) logger.warning(f"User {cb.from_user.id} mencoba memverifikasi CAPTCHA milik user {user_id}.") return if user_id not in captcha_texts: await cb.answer("āļø No active CAPTCHA verification.", show_alert=True) logger.warning(f"User {user_id} mencoba memverifikasi CAPTCHA tanpa aktif.") return captcha_data = captcha_texts.get(user_id) captcha_text = captcha_data["captcha_text"] correct_answer = captcha_data["correct_answer"] chat_id = captcha_data["chat_id"] chat_link = captcha_data["chat_link"] first_name = captcha_data["first_name"] failed_image = failed_hacker_by_randydev() hacker_image = thanks_hacker_by_randydev() try: if str(user_choice) == str(correct_answer): await cb.edit_message_media( media=InputMediaPhoto( hacker_image, caption="ā CAPTCHA verification successful!" ), reply_markup=create_button_join_group(chat_link) ) logger.info(f"User {user_id} berhasil memverifikasi CAPTCHA.") if user_id in NOT_ALLOWED_NON_PROGRAMMER: await client.ban_chat_member(chat_id=chat_id, user_id=user_id) return await client.approve_chat_join_request( chat_id=chat_id, user_id=user_id ) await client.send_message(chat_id, f"Thank you for joining {first_name}") del captcha_texts[user_id] else: await cb.edit_message_media( media=InputMediaPhoto( failed_image, caption="ā **CAPTCHA verification failed.**\n\nPlease try again." ) ) await client.decline_chat_join_request( chat_id=chat_id, user_id=user_id ) await client.send_message(chat_id, f"Failed to join {first_name}") logger.info(f"User {user_id} gagal memverifikasi CAPTCHA.") del captcha_texts[user_id] except Exception as e: await cb.answer(f"Error CAPTCHA: {e}", show_alert=True) @Client.on_callback_query(filters.regex("^refresh_captcha$")) async def refresh_captcha_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id mode = captcha_modes.get(user_id, 'math') if user_id in captcha_texts: del captcha_texts[user_id] logger.info(f"Old CAPTCHA for user {user_id} has been removed.") captcha_text, img_path, choices, correct_answer = generate_captcha(user_id, mode) captcha_texts[user_id] = { "captcha_text": captcha_text, "correct_answer": correct_answer, "chat_id": captcha_texts[user_id]["chat_id"], "chat_link": captcha_texts[user_id]["chat_link"], "first_name": captcha_texts[user_id]["first_name"] } logger.info(f"Generated new {mode} CAPTCHA for user {user_id}: {captcha_text}") buttons = [ [InlineKeyboardButton(choice, callback_data=f"verify_{user_id}_{choice}")] for choice in choices ] buttons.append([ InlineKeyboardButton("š Refresh CAPTCHA", callback_data="refresh_captcha"), InlineKeyboardButton("ā Cancel", callback_data="cancel_captcha") ]) keyboard = InlineKeyboardMarkup(buttons) try: await cb.edit_message_media( media=InputMediaPhoto(img_path), reply_markup=keyboard ) logger.info(f"Updated CAPTCHA image for user {user_id}.") except Exception as e: await cb.answer("ā Failed to update CAPTCHA.", show_alert=True) logger.error(f"Error refreshing CAPTCHA for user {user_id}: {e}") os.remove(img_path) await cb.answer("š CAPTCHA has been updated!", show_alert=False) @Client.on_callback_query(filters.regex("^cancel_captcha$")) async def cancel_captcha_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id if user_id in captcha_texts: del captcha_texts[user_id] logger.info(f"User {user_id} has canceled CAPTCHA verification.") await cb.edit_message_caption( caption="ā **CAPTCHA verification has been canceled.**\n\nIf you want to try again.", ) await cb.answer("CAPTCHA verification canceled.", show_alert=False) else: await cb.answer("āļø No active CAPTCHA verification found.", show_alert=True) logger.warning(f"User {user_id} attempted to cancel CAPTCHA without active verification.") @Client.on_callback_query(filters.regex("^close$")) async def close_final(client: Client, cb: CallbackQuery): await cb.message.delete() await cb.answer() def create_button_join_group(chat_link): return InlineKeyboardMarkup( [ [InlineKeyboardButton("šļø Join chat", url=chat_link)], [InlineKeyboardButton("š Close", callback_data="close")], ] ) def create_button_userinfo(user_id, username): return InlineKeyboardMarkup( [ [InlineKeyboardButton("š¤ Chmod +W $USER", user_id=user_id)], [InlineKeyboardButton("š Check human Bot", url=f"https://t.me/{username}")], ] )