from threading import Thread from multiprocessing import Queue from typing import Tuple, Dict, List from collections import defaultdict import logging import sys # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) class TextFilterer(Thread): def __init__( self, text_queue: "Queue[Tuple[str, str]]", filtered_text_queue: "Queue[Tuple[str, str]]", ): super().__init__() self.text_queue = text_queue self.filtered_text_queue = filtered_text_queue self.daemon = True # Thread will exit when main program exits self.text_buffers: Dict[str, List[str]] = defaultdict(list) self.max_buffer_size = 5 def filter_text(self, text: str, session_id: str) -> str | None: self.text_buffers[session_id].append(text) if len(self.text_buffers[session_id]) < self.max_buffer_size: return None while len(self.text_buffers[session_id]) > self.max_buffer_size: _ = self.text_buffers[session_id].pop(0) candidate = self.text_buffers[session_id][-2] if candidate != "": print(f"Candidate: {candidate}") if ( len(self.text_buffers[session_id][-3]) < len(candidate) >= len(self.text_buffers[session_id][-1]) ): for past in self.text_buffers[session_id][:-2]: if candidate == past: return None return candidate return None def run(self) -> None: """Main processing loop.""" while True: try: # Get text from queue, blocks until text is available text, session_id = self.text_queue.get() # Process the text into an action filtered_text = self.filter_text(text, session_id) # If we got a valid action, add it to the action queue if filtered_text: self.filtered_text_queue.put((filtered_text, session_id)) except Exception as e: logger.error(f"Error processing text: {str(e)}") continue