chatbot-web-app / vad.py
salomonsky's picture
Upload vad.py with huggingface_hub
504ccf4 verified
raw
history blame
8.84 kB
import speech_recognition as sr
import threading
import time
import pygame
from response_handler import ResponseHandler
class VoiceDetector:
def __init__(self, on_activation=None, on_speech=None, on_timeout=None):
self.recognizer = sr.Recognizer()
self.is_active = True
self.is_listening = True
self.last_interaction = time.time()
self.TIMEOUT_SECONDS = 20
self.clock = pygame.time.Clock()
self.waiting_for_activation = True
self.audio_utils = None
self.last_interrupt_time = 0
self.INTERRUPT_COOLDOWN = 1.0
# Configuraci贸n de umbrales
self.BASE_ENERGY_THRESHOLD = 300
self.HIGH_ENERGY_THRESHOLD = 600
self.current_energy_threshold = self.BASE_ENERGY_THRESHOLD
# Configuraci贸n del reconocedor
self.recognizer.energy_threshold = self.current_energy_threshold
self.recognizer.dynamic_energy_threshold = True
self.recognizer.dynamic_energy_adjustment_damping = 0.15
self.recognizer.dynamic_energy_ratio = 1.5
self.recognizer.pause_threshold = 0.8
self.recognizer.non_speaking_duration = 0.5
self.recognizer.phrase_threshold = 0.3
# Umbrales de interrupci贸n
self.INTERRUPT_ENERGY_MULTIPLIER = 2.0
self.INTERRUPT_DURATION = 0.3
self.INTERRUPT_SAMPLES = 3
self.INTERRUPT_SUCCESS_THRESHOLD = 2
self.on_activation = on_activation
self.on_speech = on_speech
self.on_timeout = on_timeout
# Control de eco y auto-activaci贸n
self.last_audio_output_time = 0
self.AUDIO_OUTPUT_COOLDOWN = 0.3 # Reducido a 0.3 segundos
self.is_high_threshold_mode = False
# Buffer circular para detecci贸n de eco
self.audio_buffer = []
self.BUFFER_SIZE = 5
self.last_played_audio = None
def set_audio_utils(self, audio_utils):
self.audio_utils = audio_utils
def set_high_threshold_mode(self, enabled):
"""Activa o desactiva el modo de umbral alto para escucha durante reproducci贸n"""
self.is_high_threshold_mode = enabled
self.current_energy_threshold = self.HIGH_ENERGY_THRESHOLD if enabled else self.BASE_ENERGY_THRESHOLD
self.recognizer.energy_threshold = self.current_energy_threshold
print(f"Umbral de energ铆a ajustado a: {self.current_energy_threshold}")
def start(self):
self.is_active = True
self.is_listening = True
threading.Thread(target=self.listen_continuously, daemon=True).start()
def stop(self):
self.is_active = False
self.is_listening = False
def listen_continuously(self):
while self.is_active and self.is_listening:
try:
with sr.Microphone() as source:
# Ajustar para ruido ambiental solo si no estamos en modo de umbral alto
if not self.is_high_threshold_mode:
self.recognizer.adjust_for_ambient_noise(source, duration=0.2)
try:
audio = self.recognizer.listen(
source,
timeout=1,
phrase_time_limit=5
)
if not self.is_active or not self.is_listening:
break
# Verificar si estamos reproduciendo audio
if self.audio_utils and self.audio_utils.is_speaking:
current_time = time.time()
# Verificar cooldown de eco
if current_time - self.last_audio_output_time < self.AUDIO_OUTPUT_COOLDOWN:
continue
# Verificar interrupci贸n con umbral actual
if self.check_for_interruption(audio.frame_data):
try:
# Intentar reconocer comando de interrupci贸n
text = self.recognizer.recognize_google(
audio,
language="es-ES"
).lower()
# Verificar que no es eco comparando con buffer
if not self.is_echo(text):
if ResponseHandler.is_stop_command(text):
print(f"Comando de interrupci贸n detectado: {text}")
self.audio_utils.stop_speaking()
self.last_interrupt_time = current_time
except sr.UnknownValueError:
# Si no se reconoce texto pero la energ铆a es alta, interrumpir
if self.is_high_threshold_mode:
self.audio_utils.stop_speaking()
self.last_interrupt_time = current_time
continue
# Procesar audio normal (no interrupci贸n)
if not self.audio_utils or not self.audio_utils.is_speaking:
text = self.recognizer.recognize_google(
audio,
language="es-ES"
).lower()
# Verificar que no es eco
if not self.is_echo(text):
if self.waiting_for_activation:
if ResponseHandler.is_activation_phrase(text):
self.waiting_for_activation = False
if self.on_activation:
self.on_activation()
else:
if self.on_speech:
self.on_speech(text)
except sr.WaitTimeoutError:
continue
except sr.UnknownValueError:
continue
except Exception as e:
print(f"Error en reconocimiento continuo: {e}")
time.sleep(1)
self.clock.tick(30)
def is_echo(self, text):
"""Verifica si el texto detectado es un eco del audio reproducido"""
# Comparar con el buffer de audio reciente
for recent_audio in self.audio_buffer:
if text.lower() in recent_audio.lower() or recent_audio.lower() in text.lower():
print("Eco detectado y filtrado")
return True
return False
def update_last_audio_output(self, text=None):
"""Actualizar el timestamp del 煤ltimo audio reproducido y el buffer"""
self.last_audio_output_time = time.time()
if text:
self.audio_buffer.append(text)
if len(self.audio_buffer) > self.BUFFER_SIZE:
self.audio_buffer.pop(0)
def check_for_interruption(self, audio_data):
"""Verificar si hay una interrupci贸n v谩lida usando el umbral actual"""
if not audio_data or len(audio_data) < 1000:
return False
# Calcular energ铆a en ventanas
window_size = 500
windows = [audio_data[i:i+window_size] for i in range(0, len(audio_data), window_size)]
energies = []
for window in windows:
if len(window) >= 2:
energy = sum(abs(int.from_bytes(window[i:i+2], 'little', signed=True))
for i in range(0, len(window), 2)) / (len(window)/2)
energies.append(energy)
if not energies:
return False
# Usar el umbral actual seg煤n el modo
threshold = self.current_energy_threshold * self.INTERRUPT_ENERGY_MULTIPLIER
high_energy_windows = sum(1 for e in energies if e > threshold)
# Requerir que al menos 70% de las ventanas tengan alta energ铆a
return high_energy_windows >= len(energies) * 0.7
def is_speaking_check(self):
"""Verificar si el sistema est谩 reproduciendo audio"""
return self.audio_utils and self.audio_utils.is_speaking