import os import logging import threading import numpy as np import torch import librosa import soundfile as sf from pydub import AudioSegment from telegram import Update from telegram.ext import ApplicationBuilder, MessageHandler, filters from transformers import pipeline, AutoTokenizer, VitsModel from huggingface_hub import login import asyncio # ===== تهيئة التوكن ===== login(token=os.getenv("HF_TOKEN")) # ===== إعدادات النظام ===== logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # ===== تحميل النماذج ===== try: asr_pipeline = pipeline( "automatic-speech-recognition", model="jonatasgrosman/wav2vec2-large-xlsr-53-arabic", token=os.getenv("HF_TOKEN") ) tts_tokenizer = AutoTokenizer.from_pretrained( "facebook/mms-tts-ara", token=os.getenv("HF_TOKEN") ) tts_model = VitsModel.from_pretrained( "facebook/mms-tts-ara", token=os.getenv("HF_TOKEN") ) except Exception as e: logger.error(f"فشل تحميل النماذج: {str(e)}") raise # ===== دوال معالجة الصوت ===== def enhance_audio(input_path: str, output_path: str) -> bool: try: audio = AudioSegment.from_wav(input_path) audio = audio.low_pass_filter(3000) audio = audio.high_pass_filter(100) audio = audio.normalize() audio = audio.fade_in(150).fade_out(150) audio.export(output_path, format="wav") return True except Exception as e: logger.error(f"خطأ في تحسين الصوت: {str(e)}") return False async def speech_to_text(audio_path: str) -> str: try: audio, sr = librosa.load(audio_path, sr=16000) sf.write("temp.wav", audio, sr) result = asr_pipeline("temp.wav") return result["text"] except Exception as e: logger.error(f"فشل التعرف على الصوت: {str(e)}") return "" async def generate_response(text: str) -> str: try: chatbot = pipeline( "text-generation", model="aubmindlab/aragpt2-base", token=os.getenv("HF_TOKEN") ) response = chatbot( text, max_length=100, num_return_sequences=1, pad_token_id=50256 ) return response[0]['generated_text'] except Exception as e: logger.error(f"فشل توليد الرد: {str(e)}") return "حدث خطأ في توليد الرد." async def text_to_speech(text: str) -> None: try: inputs = tts_tokenizer(text, return_tensors="pt") with torch.no_grad(): output = tts_model(**inputs) waveform = output.waveform[0].numpy() sf.write("bot_response.wav", waveform, tts_model.config.sampling_rate) except Exception as e: logger.error(f"فشل تحويل النص إلى صوت: {str(e)}") # ===== الدالة الرئيسية مع Threading ===== async def process_voice(update: Update, context): try: voice_file = await update.message.voice.get_file() await voice_file.download_to_drive("user_voice.ogg") user_text = await speech_to_text("user_voice.ogg") bot_response = await generate_response(user_text) await text_to_speech(bot_response) if enhance_audio("bot_response.wav", "bot_response_enhanced.wav"): await update.message.reply_voice("bot_response_enhanced.wav") else: await update.message.reply_voice("bot_response.wav") except Exception as e: logger.error(f"خطأ غير متوقع: {str(e)}") await update.message.reply_text("⚠️ عذرًا، حدث خطأ في المعالجة.") def run_bot(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) application = ApplicationBuilder().token(os.getenv("TELEGRAM_TOKEN")).build() application.add_handler(MessageHandler(filters.VOICE, process_voice)) application.run_polling( close_loop=False, stop_signals=[] ) if __name__ == "__main__": # تشغيل البوت في خيط منفصل bot_thread = threading.Thread(target=run_bot, daemon=True) bot_thread.start() bot_thread.join()