ParentalControl / server /TextFilterer.py
GitLab CI
Update game build from GitLab CI
f6ff669
from threading import Thread
from multiprocessing import Queue
from typing import Tuple, Dict, List
from collections import defaultdict
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
class TextFilterer(Thread):
def __init__(
self,
text_queue: "Queue[Tuple[str, str]]",
filtered_text_queue: "Queue[Tuple[str, str]]",
):
super().__init__()
self.text_queue = text_queue
self.filtered_text_queue = filtered_text_queue
self.daemon = True # Thread will exit when main program exits
self.text_buffers: Dict[str, List[str]] = defaultdict(list)
self.max_buffer_size = 5
def filter_text(self, text: str, session_id: str) -> str | None:
self.text_buffers[session_id].append(text)
if len(self.text_buffers[session_id]) < self.max_buffer_size:
return None
while len(self.text_buffers[session_id]) > self.max_buffer_size:
_ = self.text_buffers[session_id].pop(0)
candidate = self.text_buffers[session_id][-2]
if candidate != "":
print(f"Candidate: {candidate}")
if (
len(self.text_buffers[session_id][-3])
< len(candidate)
>= len(self.text_buffers[session_id][-1])
):
for past in self.text_buffers[session_id][:-2]:
if candidate == past:
return None
return candidate
return None
def run(self) -> None:
"""Main processing loop."""
while True:
try:
# Get text from queue, blocks until text is available
text, session_id = self.text_queue.get()
# Process the text into an action
filtered_text = self.filter_text(text, session_id)
# If we got a valid action, add it to the action queue
if filtered_text:
self.filtered_text_queue.put((filtered_text, session_id))
except Exception as e:
logger.error(f"Error processing text: {str(e)}")
continue