Skip to content

Commit

Permalink
Add topic handling functions to Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillermo Alonso committed Jun 29, 2017
1 parent ce7149e commit dda74d5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
7 changes: 4 additions & 3 deletions osbrain/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,15 @@ class AgentChannel():
receiver : str
Second AgentAddress.
"""
def __init__(self, kind, receiver, sender):
def __init__(self, kind, receiver, sender, uuid=None):
self.kind = AgentChannelKind(kind)
self.receiver = receiver
self.sender = sender
self.transport = \
receiver.transport if receiver else sender.transport
self.serializer = \
receiver.serializer if receiver else sender.serializer
self.uuid = unique_identifier()
self.uuid = uuid or unique_identifier()
# Set up pairs
if sender:
self.sender.channel = self
Expand Down Expand Up @@ -435,4 +435,5 @@ def twin(self):
kind = self.kind.twin()
sender = self.receiver.twin() if self.receiver is not None else None
receiver = self.sender.twin() if self.sender is not None else None
return self.__class__(kind=kind, receiver=receiver, sender=sender)
return self.__class__(kind=kind, receiver=receiver, sender=sender,
uuid=self.uuid)
40 changes: 39 additions & 1 deletion osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,44 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_socket_from_topic(self, socket, topic: bytes):
'''
Unsubscribe a socket from a given topic.
Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
topic
topic which we want to unsubscribe from
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic)

def subscribe_socket_to_topic(self, socket, topic: bytes):
'''
Subscribe a socket to a given topic.
Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
topic
topic which we want to subscribe to
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.SUBSCRIBE, treated_topic)

def _handle_async_requests(self, data):
address_uuid, uuid, response = data
if uuid not in self._pending_requests:
Expand Down Expand Up @@ -932,7 +970,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
curated_handlers = topics_to_bytes(handlers)
# Subscribe to topics
for topic in curated_handlers.keys():
self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
self.subscribe_socket_to_topic(alias, topic)
# Reset handlers
self._set_handler(self.socket[alias], curated_handlers)

Expand Down

0 comments on commit dda74d5

Please sign in to comment.