Spaces:
Sleeping
Sleeping
| import pynng | |
| from ditk import logging | |
| from typing import List, Optional, Tuple | |
| from pynng import Bus0 | |
| from time import sleep | |
| from ding.framework.message_queue.mq import MQ | |
| from ding.utils import MQ_REGISTRY | |
| class NNGMQ(MQ): | |
| def __init__(self, listen_to: str, attach_to: Optional[List[str]] = None, **kwargs) -> None: | |
| """ | |
| Overview: | |
| Connect distributed processes with nng | |
| Arguments: | |
| - listen_to (:obj:`Optional[List[str]]`): The node address to attach to. | |
| - attach_to (:obj:`Optional[List[str]]`): The node's addresses you want to attach to. | |
| """ | |
| self.listen_to = listen_to | |
| self.attach_to = attach_to or [] | |
| self._sock: Bus0 = None | |
| self._running = False | |
| def listen(self) -> None: | |
| self._sock = sock = Bus0() | |
| sock.listen(self.listen_to) | |
| sleep(0.1) # Wait for peers to bind | |
| for contact in self.attach_to: | |
| sock.dial(contact) | |
| logging.info("NNG listen on {}, attach to {}".format(self.listen_to, self.attach_to)) | |
| self._running = True | |
| def publish(self, topic: str, data: bytes) -> None: | |
| if self._running: | |
| topic += "::" | |
| data = topic.encode() + data | |
| self._sock.send(data) | |
| def subscribe(self, topic: str) -> None: | |
| return | |
| def unsubscribe(self, topic: str) -> None: | |
| return | |
| def recv(self) -> Tuple[str, bytes]: | |
| while True: | |
| try: | |
| if not self._running: | |
| break | |
| msg = self._sock.recv() | |
| # Use topic at the beginning of the message, so we don't need to call pickle.loads | |
| # when the current process is not subscribed to the topic. | |
| topic, payload = msg.split(b"::", maxsplit=1) | |
| return topic.decode(), payload | |
| except pynng.Timeout: | |
| logging.warning("Timeout on node {} when waiting for message from bus".format(self.listen_to)) | |
| except pynng.Closed: | |
| if self._running: | |
| logging.error("The socket was not closed under normal circumstances!") | |
| except Exception as e: | |
| logging.error("Meet exception when listening for new messages", e) | |
| def stop(self) -> None: | |
| if self._running: | |
| self._running = False | |
| self._sock.close() | |
| self._sock = None | |
| def __del__(self) -> None: | |
| self.stop() | |