import speech_recognition as sr import threading import time import pygame from response_handler import ResponseHandler class VoiceDetector: def __init__(self, on_activation=None, on_speech=None, on_timeout=None): self.recognizer = sr.Recognizer() self.is_active = True self.is_listening = True self.last_interaction = time.time() self.TIMEOUT_SECONDS = 20 self.clock = pygame.time.Clock() self.waiting_for_activation = True self.audio_utils = None self.last_interrupt_time = 0 self.INTERRUPT_COOLDOWN = 1.0 # Configuración de umbrales self.BASE_ENERGY_THRESHOLD = 300 self.HIGH_ENERGY_THRESHOLD = 600 self.current_energy_threshold = self.BASE_ENERGY_THRESHOLD # Configuración del reconocedor self.recognizer.energy_threshold = self.current_energy_threshold self.recognizer.dynamic_energy_threshold = True self.recognizer.dynamic_energy_adjustment_damping = 0.15 self.recognizer.dynamic_energy_ratio = 1.5 self.recognizer.pause_threshold = 0.8 self.recognizer.non_speaking_duration = 0.5 self.recognizer.phrase_threshold = 0.3 # Umbrales de interrupción self.INTERRUPT_ENERGY_MULTIPLIER = 2.0 self.INTERRUPT_DURATION = 0.3 self.INTERRUPT_SAMPLES = 3 self.INTERRUPT_SUCCESS_THRESHOLD = 2 self.on_activation = on_activation self.on_speech = on_speech self.on_timeout = on_timeout # Control de eco y auto-activación self.last_audio_output_time = 0 self.AUDIO_OUTPUT_COOLDOWN = 0.3 # Reducido a 0.3 segundos self.is_high_threshold_mode = False # Buffer circular para detección de eco self.audio_buffer = [] self.BUFFER_SIZE = 5 self.last_played_audio = None def set_audio_utils(self, audio_utils): self.audio_utils = audio_utils def set_high_threshold_mode(self, enabled): """Activa o desactiva el modo de umbral alto para escucha durante reproducción""" self.is_high_threshold_mode = enabled self.current_energy_threshold = self.HIGH_ENERGY_THRESHOLD if enabled else self.BASE_ENERGY_THRESHOLD self.recognizer.energy_threshold = self.current_energy_threshold print(f"Umbral de energía ajustado a: {self.current_energy_threshold}") def start(self): self.is_active = True self.is_listening = True threading.Thread(target=self.listen_continuously, daemon=True).start() def stop(self): self.is_active = False self.is_listening = False def listen_continuously(self): while self.is_active and self.is_listening: try: with sr.Microphone() as source: # Ajustar para ruido ambiental solo si no estamos en modo de umbral alto if not self.is_high_threshold_mode: self.recognizer.adjust_for_ambient_noise(source, duration=0.2) try: audio = self.recognizer.listen( source, timeout=1, phrase_time_limit=5 ) if not self.is_active or not self.is_listening: break # Verificar si estamos reproduciendo audio if self.audio_utils and self.audio_utils.is_speaking: current_time = time.time() # Verificar cooldown de eco if current_time - self.last_audio_output_time < self.AUDIO_OUTPUT_COOLDOWN: continue # Verificar interrupción con umbral actual if self.check_for_interruption(audio.frame_data): try: # Intentar reconocer comando de interrupción text = self.recognizer.recognize_google( audio, language="es-ES" ).lower() # Verificar que no es eco comparando con buffer if not self.is_echo(text): if ResponseHandler.is_stop_command(text): print(f"Comando de interrupción detectado: {text}") self.audio_utils.stop_speaking() self.last_interrupt_time = current_time except sr.UnknownValueError: # Si no se reconoce texto pero la energía es alta, interrumpir if self.is_high_threshold_mode: self.audio_utils.stop_speaking() self.last_interrupt_time = current_time continue # Procesar audio normal (no interrupción) if not self.audio_utils or not self.audio_utils.is_speaking: text = self.recognizer.recognize_google( audio, language="es-ES" ).lower() # Verificar que no es eco if not self.is_echo(text): if self.waiting_for_activation: if ResponseHandler.is_activation_phrase(text): self.waiting_for_activation = False if self.on_activation: self.on_activation() else: if self.on_speech: self.on_speech(text) except sr.WaitTimeoutError: continue except sr.UnknownValueError: continue except Exception as e: print(f"Error en reconocimiento continuo: {e}") time.sleep(1) self.clock.tick(30) def is_echo(self, text): """Verifica si el texto detectado es un eco del audio reproducido""" # Comparar con el buffer de audio reciente for recent_audio in self.audio_buffer: if text.lower() in recent_audio.lower() or recent_audio.lower() in text.lower(): print("Eco detectado y filtrado") return True return False def update_last_audio_output(self, text=None): """Actualizar el timestamp del último audio reproducido y el buffer""" self.last_audio_output_time = time.time() if text: self.audio_buffer.append(text) if len(self.audio_buffer) > self.BUFFER_SIZE: self.audio_buffer.pop(0) def check_for_interruption(self, audio_data): """Verificar si hay una interrupción válida usando el umbral actual""" if not audio_data or len(audio_data) < 1000: return False # Calcular energía en ventanas window_size = 500 windows = [audio_data[i:i+window_size] for i in range(0, len(audio_data), window_size)] energies = [] for window in windows: if len(window) >= 2: energy = sum(abs(int.from_bytes(window[i:i+2], 'little', signed=True)) for i in range(0, len(window), 2)) / (len(window)/2) energies.append(energy) if not energies: return False # Usar el umbral actual según el modo threshold = self.current_energy_threshold * self.INTERRUPT_ENERGY_MULTIPLIER high_energy_windows = sum(1 for e in energies if e > threshold) # Requerir que al menos 70% de las ventanas tengan alta energía return high_energy_windows >= len(energies) * 0.7 def is_speaking_check(self): """Verificar si el sistema está reproduciendo audio""" return self.audio_utils and self.audio_utils.is_speaking