Spaces:
Running
on
A10G
Running
on
A10G
from threading import Thread | |
from multiprocessing import Queue | |
from typing import Tuple, Dict, List | |
from collections import defaultdict | |
import logging | |
import sys | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
logger = logging.getLogger(__name__) | |
class TextFilterer(Thread): | |
def __init__( | |
self, | |
text_queue: "Queue[Tuple[str, str]]", | |
filtered_text_queue: "Queue[Tuple[str, str]]", | |
): | |
super().__init__() | |
self.text_queue = text_queue | |
self.filtered_text_queue = filtered_text_queue | |
self.daemon = True # Thread will exit when main program exits | |
self.text_buffers: Dict[str, List[str]] = defaultdict(list) | |
self.max_buffer_size = 5 | |
def filter_text(self, text: str, session_id: str) -> str | None: | |
self.text_buffers[session_id].append(text) | |
if len(self.text_buffers[session_id]) < self.max_buffer_size: | |
return None | |
while len(self.text_buffers[session_id]) > self.max_buffer_size: | |
_ = self.text_buffers[session_id].pop(0) | |
candidate = self.text_buffers[session_id][-2] | |
if candidate != "": | |
print(f"Candidate: {candidate}") | |
if ( | |
len(self.text_buffers[session_id][-3]) | |
< len(candidate) | |
>= len(self.text_buffers[session_id][-1]) | |
): | |
for past in self.text_buffers[session_id][:-2]: | |
if candidate == past: | |
return None | |
return candidate | |
return None | |
def run(self) -> None: | |
"""Main processing loop.""" | |
while True: | |
try: | |
# Get text from queue, blocks until text is available | |
text, session_id = self.text_queue.get() | |
# Process the text into an action | |
filtered_text = self.filter_text(text, session_id) | |
# If we got a valid action, add it to the action queue | |
if filtered_text: | |
self.filtered_text_queue.put((filtered_text, session_id)) | |
except Exception as e: | |
logger.error(f"Error processing text: {str(e)}") | |
continue | |