Spaces:
Running
on
Zero
Running
on
Zero
| import time | |
| from threading import Thread, Lock | |
| class Listener: | |
| task_queue = [] | |
| lock = Lock() | |
| thread = None | |
| def _process_tasks(cls): | |
| while True: | |
| task = None | |
| with cls.lock: | |
| if cls.task_queue: | |
| task = cls.task_queue.pop(0) | |
| if task is None: | |
| time.sleep(0.001) | |
| continue | |
| func, args, kwargs = task | |
| try: | |
| func(*args, **kwargs) | |
| except Exception as e: | |
| print(f"Error in listener thread: {e}") | |
| def add_task(cls, func, *args, **kwargs): | |
| with cls.lock: | |
| cls.task_queue.append((func, args, kwargs)) | |
| if cls.thread is None: | |
| cls.thread = Thread(target=cls._process_tasks, daemon=True) | |
| cls.thread.start() | |
| def async_run(func, *args, **kwargs): | |
| Listener.add_task(func, *args, **kwargs) | |
| class FIFOQueue: | |
| def __init__(self): | |
| self.queue = [] | |
| self.lock = Lock() | |
| print("【调试】创建新的FIFOQueue") | |
| def push(self, item): | |
| print(f"【调试】FIFOQueue.push: 准备添加项目: {item}") | |
| with self.lock: | |
| self.queue.append(item) | |
| print(f"【调试】FIFOQueue.push: 成功添加项目: {item}, 当前队列长度: {len(self.queue)}") | |
| def pop(self): | |
| print("【调试】FIFOQueue.pop: 准备弹出队列首项") | |
| with self.lock: | |
| if self.queue: | |
| item = self.queue.pop(0) | |
| print(f"【调试】FIFOQueue.pop: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}") | |
| return item | |
| print("【调试】FIFOQueue.pop: 队列为空,返回None") | |
| return None | |
| def top(self): | |
| print("【调试】FIFOQueue.top: 准备查看队列首项") | |
| with self.lock: | |
| if self.queue: | |
| item = self.queue[0] | |
| print(f"【调试】FIFOQueue.top: 队列首项为: {item}, 当前队列长度: {len(self.queue)}") | |
| return item | |
| print("【调试】FIFOQueue.top: 队列为空,返回None") | |
| return None | |
| def next(self): | |
| print("【调试】FIFOQueue.next: 等待弹出队列首项") | |
| while True: | |
| with self.lock: | |
| if self.queue: | |
| item = self.queue.pop(0) | |
| print(f"【调试】FIFOQueue.next: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}") | |
| return item | |
| time.sleep(0.001) | |
| class AsyncStream: | |
| def __init__(self): | |
| self.input_queue = FIFOQueue() | |
| self.output_queue = FIFOQueue() | |
| class InterruptibleStreamData: | |
| def __init__(self): | |
| self.input_queue = FIFOQueue() | |
| self.output_queue = FIFOQueue() | |
| print("【调试】创建新的InterruptibleStreamData,初始化输入输出队列") | |
| # 推送数据至输出队列 | |
| def push_output(self, item): | |
| print(f"【调试】InterruptibleStreamData.push_output: 准备推送输出: {type(item)}") | |
| self.output_queue.push(item) | |
| print(f"【调试】InterruptibleStreamData.push_output: 成功推送输出") | |
| # 获取下一个输出数据 | |
| def get_output(self): | |
| print("【调试】InterruptibleStreamData.get_output: 准备获取下一个输出数据") | |
| item = self.output_queue.next() | |
| print(f"【调试】InterruptibleStreamData.get_output: 获取到输出数据: {type(item)}") | |
| return item | |
| # 推送数据至输入队列 | |
| def push_input(self, item): | |
| print(f"【调试】InterruptibleStreamData.push_input: 准备推送输入: {type(item)}") | |
| self.input_queue.push(item) | |
| print(f"【调试】InterruptibleStreamData.push_input: 成功推送输入") | |
| # 获取下一个输入数据 | |
| def get_input(self): | |
| print("【调试】InterruptibleStreamData.get_input: 准备获取下一个输入数据") | |
| item = self.input_queue.next() | |
| print(f"【调试】InterruptibleStreamData.get_input: 获取到输入数据: {type(item)}") | |
| return item | |