Spaces:
Running
on
A10G
Running
on
A10G
import io | |
from typing import Tuple | |
import threading | |
from multiprocessing import Queue | |
from queue import Empty | |
from faster_whisper import WhisperModel | |
import logging | |
import sys | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
# Get a logger for your app | |
logger = logging.getLogger(__name__) | |
class AudioTranscriber(threading.Thread): | |
def __init__( | |
self, | |
audio_queue: "Queue[Tuple[io.BytesIO, str]]", | |
text_queue: "Queue[Tuple[str, str]]", | |
language: str = "en", | |
confidence_threshold: float = 0.5, | |
device_index: int = 0, | |
): | |
super().__init__() | |
self.audio_queue = audio_queue | |
self.action_queue = text_queue | |
self.daemon = True # Thread will exit when main program exits | |
self.language = language | |
self.confidence_threshold = confidence_threshold | |
self.transcriber = WhisperModel( | |
"large", | |
device="cuda", | |
device_index=device_index, | |
compute_type="int8", | |
) | |
def run(self): | |
while True: | |
try: | |
# Wait for 1 second before timing out and checking again | |
audio_data, session_id = self.audio_queue.get(timeout=1) | |
segments, _ = self.transcriber.transcribe( | |
audio_data, language=self.language | |
) | |
# Put the transcription results in the output queue | |
for segment in segments: | |
if segment.no_speech_prob <= self.confidence_threshold: | |
self.action_queue.put((segment.text, session_id)) | |
# Still print for debugging | |
logger.info( | |
f"[Thread {threading.get_ident()}] [{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}" | |
) | |
else: | |
self.action_queue.put(("", session_id)) | |
except Empty: | |
continue # If queue is empty, continue waiting | |
except Exception as e: | |
logger.error(f"Error processing audio chunk: {e}") | |