import io from typing import Tuple import threading from multiprocessing import Queue from queue import Empty from faster_whisper import WhisperModel import logging import sys # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) # Get a logger for your app logger = logging.getLogger(__name__) class AudioTranscriber(threading.Thread): def __init__( self, audio_queue: "Queue[Tuple[io.BytesIO, str]]", text_queue: "Queue[Tuple[str, str]]", language: str = "en", confidence_threshold: float = 0.5, device_index: int = 0, ): super().__init__() self.audio_queue = audio_queue self.action_queue = text_queue self.daemon = True # Thread will exit when main program exits self.language = language self.confidence_threshold = confidence_threshold self.transcriber = WhisperModel( "large", device="cuda", device_index=device_index, compute_type="int8", ) def run(self): while True: try: # Wait for 1 second before timing out and checking again audio_data, session_id = self.audio_queue.get(timeout=1) segments, _ = self.transcriber.transcribe( audio_data, language=self.language ) # Put the transcription results in the output queue for segment in segments: if segment.no_speech_prob <= self.confidence_threshold: self.action_queue.put((segment.text, session_id)) # Still print for debugging logger.info( f"[Thread {threading.get_ident()}] [{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}" ) else: self.action_queue.put(("", session_id)) except Empty: continue # If queue is empty, continue waiting except Exception as e: logger.error(f"Error processing audio chunk: {e}")