From 5b2b12b2f80b393af9535ab14b428f0191fb0846 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Mon, 19 Jun 2017 18:08:02 +0200 Subject: [PATCH] Add code to handle subscriptions for channels (SYNC_SUB) --- osbrain/address.py | 7 ++++--- osbrain/agent.py | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/osbrain/address.py b/osbrain/address.py index b1c73f9..b7b26dd 100644 --- a/osbrain/address.py +++ b/osbrain/address.py @@ -389,7 +389,7 @@ class AgentChannel(): receiver : str Second AgentAddress. """ - def __init__(self, kind, receiver, sender): + def __init__(self, kind, receiver, sender, uuid=None): self.kind = AgentChannelKind(kind) self.receiver = receiver self.sender = sender @@ -397,7 +397,7 @@ def __init__(self, kind, receiver, sender): receiver.transport if receiver else sender.transport self.serializer = \ receiver.serializer if receiver else sender.serializer - self.uuid = unique_identifier() + self.uuid = uuid or unique_identifier() # Set up pairs if sender: self.sender.channel = self @@ -435,4 +435,5 @@ def twin(self): kind = self.kind.twin() sender = self.receiver.twin() if self.receiver is not None else None receiver = self.sender.twin() if self.sender is not None else None - return self.__class__(kind=kind, receiver=receiver, sender=sender) + return self.__class__(kind=kind, receiver=receiver, sender=sender, + uuid=self.uuid) diff --git a/osbrain/agent.py b/osbrain/agent.py index b072726..5bdd117 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -909,7 +909,13 @@ def unsubscribe_socket_from_topic(self, socket, topic: bytes): topic topic which we want to unsubscribe from ''' - self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) + if isinstance(self.address[socket], AgentAddress): + self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) + if isinstance(self.address[socket], AgentChannel): + channel = self.address[socket] + socket = channel.receiver + treated_topic = channel.uuid + topic + self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) def subscribe_socket_to_topic(self, socket, topic: bytes): ''' @@ -922,7 +928,13 @@ def subscribe_socket_to_topic(self, socket, topic: bytes): topic topic which we want to subscribe to ''' - self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) + if isinstance(self.address[socket], AgentAddress): + self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) + if isinstance(self.address[socket], AgentChannel): + channel = self.address[socket] + socket = channel.receiver + treated_topic = channel.uuid + topic + self.socket[socket].setsockopt(zmq.SUBSCRIBE, treated_topic) def _handle_async_requests(self, data): address_uuid, uuid, response = data