diff --git a/osbrain/agent.py b/osbrain/agent.py index 602b9ab..b072726 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -898,6 +898,32 @@ def _connect_and_register(self, client_address, alias=None, handler=None, self.register(socket, register_as, alias, handler) return client_address + def unsubscribe_socket_from_topic(self, socket, topic: bytes): + ''' + Unsubscribe a socket from a given topic. + + Parameters + ---------- + socket + Identifier of the socket. Must be a valid entry of `self.socket` + topic + topic which we want to unsubscribe from + ''' + self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) + + def subscribe_socket_to_topic(self, socket, topic: bytes): + ''' + Subscribe a socket to a given topic. + + Parameters + ---------- + socket + Identifier of the socket. Must be a valid entry of `self.socket` + topic + topic which we want to subscribe to + ''' + self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) + def _handle_async_requests(self, data): address_uuid, uuid, response = data if uuid not in self._pending_requests: @@ -932,7 +958,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): curated_handlers = topics_to_bytes(handlers) # Subscribe to topics for topic in curated_handlers.keys(): - self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic) + self.subscribe_socket_to_topic(alias, topic) # Reset handlers self._set_handler(self.socket[alias], curated_handlers)