Skip to content

Commit

Permalink
Add topic handling functions to Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillermo Alonso committed Jun 19, 2017
1 parent 2c67891 commit 16a2bf1
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,32 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_socket_from_topic(self, socket, topic: bytes):
'''
Unsubscribe a socket from a given topic.
Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
topic
topic which we want to unsubscribe from
'''
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)

def subscribe_socket_to_topic(self, socket, topic: bytes):
'''
Subscribe a socket to a given topic.
Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
topic
topic which we want to subscribe to
'''
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)

def _handle_async_requests(self, data):
address_uuid, uuid, response = data
if uuid not in self._pending_requests:
Expand Down Expand Up @@ -932,7 +958,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
curated_handlers = topics_to_bytes(handlers)
# Subscribe to topics
for topic in curated_handlers.keys():
self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
self.subscribe_socket_to_topic(alias, topic)
# Reset handlers
self._set_handler(self.socket[alias], curated_handlers)

Expand Down

0 comments on commit 16a2bf1

Please sign in to comment.