Skip to content

Commit

Permalink
Add code to handle subscriptions for channels (SYNC_SUB)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillermo Alonso committed Jun 29, 2017
1 parent 92dd477 commit 5b2b12b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
7 changes: 4 additions & 3 deletions osbrain/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,15 @@ class AgentChannel():
receiver : str
Second AgentAddress.
"""
def __init__(self, kind, receiver, sender):
def __init__(self, kind, receiver, sender, uuid=None):
self.kind = AgentChannelKind(kind)
self.receiver = receiver
self.sender = sender
self.transport = \
receiver.transport if receiver else sender.transport
self.serializer = \
receiver.serializer if receiver else sender.serializer
self.uuid = unique_identifier()
self.uuid = uuid or unique_identifier()
# Set up pairs
if sender:
self.sender.channel = self
Expand Down Expand Up @@ -435,4 +435,5 @@ def twin(self):
kind = self.kind.twin()
sender = self.receiver.twin() if self.receiver is not None else None
receiver = self.sender.twin() if self.sender is not None else None
return self.__class__(kind=kind, receiver=receiver, sender=sender)
return self.__class__(kind=kind, receiver=receiver, sender=sender,
uuid=self.uuid)
16 changes: 14 additions & 2 deletions osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,13 @@ def unsubscribe_socket_from_topic(self, socket, topic: bytes):
topic
topic which we want to unsubscribe from
'''
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic)

def subscribe_socket_to_topic(self, socket, topic: bytes):
'''
Expand All @@ -922,7 +928,13 @@ def subscribe_socket_to_topic(self, socket, topic: bytes):
topic
topic which we want to subscribe to
'''
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.SUBSCRIBE, treated_topic)

def _handle_async_requests(self, data):
address_uuid, uuid, response = data
Expand Down

0 comments on commit 5b2b12b

Please sign in to comment.