Spaces:
Runtime error
Runtime error
File size: 8,837 Bytes
504ccf4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import speech_recognition as sr
import threading
import time
import pygame
from response_handler import ResponseHandler
class VoiceDetector:
def __init__(self, on_activation=None, on_speech=None, on_timeout=None):
self.recognizer = sr.Recognizer()
self.is_active = True
self.is_listening = True
self.last_interaction = time.time()
self.TIMEOUT_SECONDS = 20
self.clock = pygame.time.Clock()
self.waiting_for_activation = True
self.audio_utils = None
self.last_interrupt_time = 0
self.INTERRUPT_COOLDOWN = 1.0
# Configuraci贸n de umbrales
self.BASE_ENERGY_THRESHOLD = 300
self.HIGH_ENERGY_THRESHOLD = 600
self.current_energy_threshold = self.BASE_ENERGY_THRESHOLD
# Configuraci贸n del reconocedor
self.recognizer.energy_threshold = self.current_energy_threshold
self.recognizer.dynamic_energy_threshold = True
self.recognizer.dynamic_energy_adjustment_damping = 0.15
self.recognizer.dynamic_energy_ratio = 1.5
self.recognizer.pause_threshold = 0.8
self.recognizer.non_speaking_duration = 0.5
self.recognizer.phrase_threshold = 0.3
# Umbrales de interrupci贸n
self.INTERRUPT_ENERGY_MULTIPLIER = 2.0
self.INTERRUPT_DURATION = 0.3
self.INTERRUPT_SAMPLES = 3
self.INTERRUPT_SUCCESS_THRESHOLD = 2
self.on_activation = on_activation
self.on_speech = on_speech
self.on_timeout = on_timeout
# Control de eco y auto-activaci贸n
self.last_audio_output_time = 0
self.AUDIO_OUTPUT_COOLDOWN = 0.3 # Reducido a 0.3 segundos
self.is_high_threshold_mode = False
# Buffer circular para detecci贸n de eco
self.audio_buffer = []
self.BUFFER_SIZE = 5
self.last_played_audio = None
def set_audio_utils(self, audio_utils):
self.audio_utils = audio_utils
def set_high_threshold_mode(self, enabled):
"""Activa o desactiva el modo de umbral alto para escucha durante reproducci贸n"""
self.is_high_threshold_mode = enabled
self.current_energy_threshold = self.HIGH_ENERGY_THRESHOLD if enabled else self.BASE_ENERGY_THRESHOLD
self.recognizer.energy_threshold = self.current_energy_threshold
print(f"Umbral de energ铆a ajustado a: {self.current_energy_threshold}")
def start(self):
self.is_active = True
self.is_listening = True
threading.Thread(target=self.listen_continuously, daemon=True).start()
def stop(self):
self.is_active = False
self.is_listening = False
def listen_continuously(self):
while self.is_active and self.is_listening:
try:
with sr.Microphone() as source:
# Ajustar para ruido ambiental solo si no estamos en modo de umbral alto
if not self.is_high_threshold_mode:
self.recognizer.adjust_for_ambient_noise(source, duration=0.2)
try:
audio = self.recognizer.listen(
source,
timeout=1,
phrase_time_limit=5
)
if not self.is_active or not self.is_listening:
break
# Verificar si estamos reproduciendo audio
if self.audio_utils and self.audio_utils.is_speaking:
current_time = time.time()
# Verificar cooldown de eco
if current_time - self.last_audio_output_time < self.AUDIO_OUTPUT_COOLDOWN:
continue
# Verificar interrupci贸n con umbral actual
if self.check_for_interruption(audio.frame_data):
try:
# Intentar reconocer comando de interrupci贸n
text = self.recognizer.recognize_google(
audio,
language="es-ES"
).lower()
# Verificar que no es eco comparando con buffer
if not self.is_echo(text):
if ResponseHandler.is_stop_command(text):
print(f"Comando de interrupci贸n detectado: {text}")
self.audio_utils.stop_speaking()
self.last_interrupt_time = current_time
except sr.UnknownValueError:
# Si no se reconoce texto pero la energ铆a es alta, interrumpir
if self.is_high_threshold_mode:
self.audio_utils.stop_speaking()
self.last_interrupt_time = current_time
continue
# Procesar audio normal (no interrupci贸n)
if not self.audio_utils or not self.audio_utils.is_speaking:
text = self.recognizer.recognize_google(
audio,
language="es-ES"
).lower()
# Verificar que no es eco
if not self.is_echo(text):
if self.waiting_for_activation:
if ResponseHandler.is_activation_phrase(text):
self.waiting_for_activation = False
if self.on_activation:
self.on_activation()
else:
if self.on_speech:
self.on_speech(text)
except sr.WaitTimeoutError:
continue
except sr.UnknownValueError:
continue
except Exception as e:
print(f"Error en reconocimiento continuo: {e}")
time.sleep(1)
self.clock.tick(30)
def is_echo(self, text):
"""Verifica si el texto detectado es un eco del audio reproducido"""
# Comparar con el buffer de audio reciente
for recent_audio in self.audio_buffer:
if text.lower() in recent_audio.lower() or recent_audio.lower() in text.lower():
print("Eco detectado y filtrado")
return True
return False
def update_last_audio_output(self, text=None):
"""Actualizar el timestamp del 煤ltimo audio reproducido y el buffer"""
self.last_audio_output_time = time.time()
if text:
self.audio_buffer.append(text)
if len(self.audio_buffer) > self.BUFFER_SIZE:
self.audio_buffer.pop(0)
def check_for_interruption(self, audio_data):
"""Verificar si hay una interrupci贸n v谩lida usando el umbral actual"""
if not audio_data or len(audio_data) < 1000:
return False
# Calcular energ铆a en ventanas
window_size = 500
windows = [audio_data[i:i+window_size] for i in range(0, len(audio_data), window_size)]
energies = []
for window in windows:
if len(window) >= 2:
energy = sum(abs(int.from_bytes(window[i:i+2], 'little', signed=True))
for i in range(0, len(window), 2)) / (len(window)/2)
energies.append(energy)
if not energies:
return False
# Usar el umbral actual seg煤n el modo
threshold = self.current_energy_threshold * self.INTERRUPT_ENERGY_MULTIPLIER
high_energy_windows = sum(1 for e in energies if e > threshold)
# Requerir que al menos 70% de las ventanas tengan alta energ铆a
return high_energy_windows >= len(energies) * 0.7
def is_speaking_check(self):
"""Verificar si el sistema est谩 reproduciendo audio"""
return self.audio_utils and self.audio_utils.is_speaking |