Spaces:
Sleeping
Sleeping
| from collections import defaultdict | |
| from typing import Callable, Optional | |
| from concurrent.futures import ThreadPoolExecutor | |
| from copy import copy | |
| import fnmatch | |
| from ditk import logging | |
| class EventLoop: | |
| loops = {} | |
| def __init__(self, name: str = "default") -> None: | |
| self._name = name | |
| self._listeners = defaultdict(list) | |
| self._thread_pool = ThreadPoolExecutor(max_workers=2) | |
| self._exception = None | |
| self._active = True | |
| def on(self, event: str, fn: Callable) -> None: | |
| """ | |
| Overview: | |
| Subscribe to an event, execute this function every time the event is emitted. | |
| Arguments: | |
| - event (:obj:`str`): Event name. | |
| - fn (:obj:`Callable`): The function. | |
| """ | |
| self._listeners[event].append(fn) | |
| def off(self, event: str, fn: Optional[Callable] = None) -> None: | |
| """ | |
| Overview: | |
| Unsubscribe an event, or a specific function in the event. | |
| Arguments: | |
| - event (:obj:`str`): Event name. | |
| - fn (:obj:`Optional[Callable]`): The function. | |
| """ | |
| for e in fnmatch.filter(self._listeners.keys(), event): | |
| if fn: | |
| try: | |
| self._listeners[e].remove(fn) | |
| except: | |
| pass | |
| else: | |
| self._listeners[e] = [] | |
| def once(self, event: str, fn: Callable) -> None: | |
| """ | |
| Overview: | |
| Subscribe to an event, execute this function only once when the event is emitted. | |
| Arguments: | |
| - event (:obj:`str`): Event name. | |
| - fn (:obj:`Callable`): The function. | |
| """ | |
| def once_callback(*args, **kwargs): | |
| self.off(event, once_callback) | |
| fn(*args, **kwargs) | |
| self.on(event, once_callback) | |
| def emit(self, event: str, *args, **kwargs) -> None: | |
| """ | |
| Overview: | |
| Emit an event, call listeners. | |
| If there is an unhandled error in this event loop, calling emit will raise an exception, | |
| which will cause the process to exit. | |
| Arguments: | |
| - event (:obj:`str`): Event name. | |
| """ | |
| if self._exception: | |
| raise self._exception | |
| if self._active: | |
| self._thread_pool.submit(self._trigger, event, *args, **kwargs) | |
| def _trigger(self, event: str, *args, **kwargs) -> None: | |
| """ | |
| Overview: | |
| Execute the callbacks under the event. If any callback raise an exception, | |
| we will save the traceback and ignore the exception. | |
| Arguments: | |
| - event (:obj:`str`): Event name. | |
| """ | |
| if event not in self._listeners: | |
| logging.debug("Event {} is not registered in the callbacks of {}!".format(event, self._name)) | |
| return | |
| for fn in copy(self._listeners[event]): | |
| try: | |
| fn(*args, **kwargs) | |
| except Exception as e: | |
| self._exception = e | |
| def listened(self, event: str) -> bool: | |
| """ | |
| Overview: | |
| Check if the event has been listened to. | |
| Arguments: | |
| - event (:obj:`str`): Event name | |
| Returns: | |
| - listened (:obj:`bool`): Whether this event has been listened to. | |
| """ | |
| return event in self._listeners | |
| def get_event_loop(cls: type, name: str = "default") -> "EventLoop": | |
| """ | |
| Overview: | |
| Get new event loop when name not exists, or return the existed instance. | |
| Arguments: | |
| - name (:obj:`str`): Name of event loop. | |
| """ | |
| if name in cls.loops: | |
| return cls.loops[name] | |
| cls.loops[name] = loop = cls(name) | |
| return loop | |
| def stop(self) -> None: | |
| self._active = False | |
| self._listeners = defaultdict(list) | |
| self._exception = None | |
| self._thread_pool.shutdown() | |
| if self._name in EventLoop.loops: | |
| del EventLoop.loops[self._name] | |
| def __del__(self) -> None: | |
| if self._active: | |
| self.stop() | |