Spaces:
Runtime error
Runtime error
import speech_recognition as sr | |
import threading | |
import time | |
import pygame | |
from response_handler import ResponseHandler | |
class VoiceDetector: | |
def __init__(self, on_activation=None, on_speech=None, on_timeout=None): | |
self.recognizer = sr.Recognizer() | |
self.is_active = True | |
self.is_listening = True | |
self.last_interaction = time.time() | |
self.TIMEOUT_SECONDS = 20 | |
self.clock = pygame.time.Clock() | |
self.waiting_for_activation = True | |
self.audio_utils = None | |
self.last_interrupt_time = 0 | |
self.INTERRUPT_COOLDOWN = 1.0 | |
# Configuraci贸n de umbrales | |
self.BASE_ENERGY_THRESHOLD = 300 | |
self.HIGH_ENERGY_THRESHOLD = 600 | |
self.current_energy_threshold = self.BASE_ENERGY_THRESHOLD | |
# Configuraci贸n del reconocedor | |
self.recognizer.energy_threshold = self.current_energy_threshold | |
self.recognizer.dynamic_energy_threshold = True | |
self.recognizer.dynamic_energy_adjustment_damping = 0.15 | |
self.recognizer.dynamic_energy_ratio = 1.5 | |
self.recognizer.pause_threshold = 0.8 | |
self.recognizer.non_speaking_duration = 0.5 | |
self.recognizer.phrase_threshold = 0.3 | |
# Umbrales de interrupci贸n | |
self.INTERRUPT_ENERGY_MULTIPLIER = 2.0 | |
self.INTERRUPT_DURATION = 0.3 | |
self.INTERRUPT_SAMPLES = 3 | |
self.INTERRUPT_SUCCESS_THRESHOLD = 2 | |
self.on_activation = on_activation | |
self.on_speech = on_speech | |
self.on_timeout = on_timeout | |
# Control de eco y auto-activaci贸n | |
self.last_audio_output_time = 0 | |
self.AUDIO_OUTPUT_COOLDOWN = 0.3 # Reducido a 0.3 segundos | |
self.is_high_threshold_mode = False | |
# Buffer circular para detecci贸n de eco | |
self.audio_buffer = [] | |
self.BUFFER_SIZE = 5 | |
self.last_played_audio = None | |
def set_audio_utils(self, audio_utils): | |
self.audio_utils = audio_utils | |
def set_high_threshold_mode(self, enabled): | |
"""Activa o desactiva el modo de umbral alto para escucha durante reproducci贸n""" | |
self.is_high_threshold_mode = enabled | |
self.current_energy_threshold = self.HIGH_ENERGY_THRESHOLD if enabled else self.BASE_ENERGY_THRESHOLD | |
self.recognizer.energy_threshold = self.current_energy_threshold | |
print(f"Umbral de energ铆a ajustado a: {self.current_energy_threshold}") | |
def start(self): | |
self.is_active = True | |
self.is_listening = True | |
threading.Thread(target=self.listen_continuously, daemon=True).start() | |
def stop(self): | |
self.is_active = False | |
self.is_listening = False | |
def listen_continuously(self): | |
while self.is_active and self.is_listening: | |
try: | |
with sr.Microphone() as source: | |
# Ajustar para ruido ambiental solo si no estamos en modo de umbral alto | |
if not self.is_high_threshold_mode: | |
self.recognizer.adjust_for_ambient_noise(source, duration=0.2) | |
try: | |
audio = self.recognizer.listen( | |
source, | |
timeout=1, | |
phrase_time_limit=5 | |
) | |
if not self.is_active or not self.is_listening: | |
break | |
# Verificar si estamos reproduciendo audio | |
if self.audio_utils and self.audio_utils.is_speaking: | |
current_time = time.time() | |
# Verificar cooldown de eco | |
if current_time - self.last_audio_output_time < self.AUDIO_OUTPUT_COOLDOWN: | |
continue | |
# Verificar interrupci贸n con umbral actual | |
if self.check_for_interruption(audio.frame_data): | |
try: | |
# Intentar reconocer comando de interrupci贸n | |
text = self.recognizer.recognize_google( | |
audio, | |
language="es-ES" | |
).lower() | |
# Verificar que no es eco comparando con buffer | |
if not self.is_echo(text): | |
if ResponseHandler.is_stop_command(text): | |
print(f"Comando de interrupci贸n detectado: {text}") | |
self.audio_utils.stop_speaking() | |
self.last_interrupt_time = current_time | |
except sr.UnknownValueError: | |
# Si no se reconoce texto pero la energ铆a es alta, interrumpir | |
if self.is_high_threshold_mode: | |
self.audio_utils.stop_speaking() | |
self.last_interrupt_time = current_time | |
continue | |
# Procesar audio normal (no interrupci贸n) | |
if not self.audio_utils or not self.audio_utils.is_speaking: | |
text = self.recognizer.recognize_google( | |
audio, | |
language="es-ES" | |
).lower() | |
# Verificar que no es eco | |
if not self.is_echo(text): | |
if self.waiting_for_activation: | |
if ResponseHandler.is_activation_phrase(text): | |
self.waiting_for_activation = False | |
if self.on_activation: | |
self.on_activation() | |
else: | |
if self.on_speech: | |
self.on_speech(text) | |
except sr.WaitTimeoutError: | |
continue | |
except sr.UnknownValueError: | |
continue | |
except Exception as e: | |
print(f"Error en reconocimiento continuo: {e}") | |
time.sleep(1) | |
self.clock.tick(30) | |
def is_echo(self, text): | |
"""Verifica si el texto detectado es un eco del audio reproducido""" | |
# Comparar con el buffer de audio reciente | |
for recent_audio in self.audio_buffer: | |
if text.lower() in recent_audio.lower() or recent_audio.lower() in text.lower(): | |
print("Eco detectado y filtrado") | |
return True | |
return False | |
def update_last_audio_output(self, text=None): | |
"""Actualizar el timestamp del 煤ltimo audio reproducido y el buffer""" | |
self.last_audio_output_time = time.time() | |
if text: | |
self.audio_buffer.append(text) | |
if len(self.audio_buffer) > self.BUFFER_SIZE: | |
self.audio_buffer.pop(0) | |
def check_for_interruption(self, audio_data): | |
"""Verificar si hay una interrupci贸n v谩lida usando el umbral actual""" | |
if not audio_data or len(audio_data) < 1000: | |
return False | |
# Calcular energ铆a en ventanas | |
window_size = 500 | |
windows = [audio_data[i:i+window_size] for i in range(0, len(audio_data), window_size)] | |
energies = [] | |
for window in windows: | |
if len(window) >= 2: | |
energy = sum(abs(int.from_bytes(window[i:i+2], 'little', signed=True)) | |
for i in range(0, len(window), 2)) / (len(window)/2) | |
energies.append(energy) | |
if not energies: | |
return False | |
# Usar el umbral actual seg煤n el modo | |
threshold = self.current_energy_threshold * self.INTERRUPT_ENERGY_MULTIPLIER | |
high_energy_windows = sum(1 for e in energies if e > threshold) | |
# Requerir que al menos 70% de las ventanas tengan alta energ铆a | |
return high_energy_windows >= len(energies) * 0.7 | |
def is_speaking_check(self): | |
"""Verificar si el sistema est谩 reproduciendo audio""" | |
return self.audio_utils and self.audio_utils.is_speaking |