From ce7149eb92ddb335e1ef75aa00f72fc09d23233b Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Mon, 19 Jun 2017 16:25:18 +0200 Subject: [PATCH 1/9] Add test for SYNC_PUB and PUB topic handling --- osbrain/tests/test_agent_sync_publications.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/osbrain/tests/test_agent_sync_publications.py b/osbrain/tests/test_agent_sync_publications.py index a528297..af8aaf3 100644 --- a/osbrain/tests/test_agent_sync_publications.py +++ b/osbrain/tests/test_agent_sync_publications.py @@ -73,6 +73,41 @@ def on_error(agent): agent.error_log.append('error') +@pytest.mark.parametrize('socket_type', ['PUB', 'SYNC_PUB']) +def test_change_subscription_topics_sync(nsproxy, socket_type): + ''' + Test for the different options of subscribing/unsubscribing to topics + in the SYNC_PUB/SYNC_SUB pattern. + ''' + server = run_agent('server') + client = run_agent('client') + + addr = server.bind(socket_type, alias='pub', handler=lambda: None) + client.set_attr(received=[]) + client.connect(addr, alias='sub', handler=append_received) + + # Stablish a connection + server.each(0.1, 'send', 'pub', 'connecting...') + assert wait_agent_attr(client, name='received', data='connecting...') + + # By default, client will be subscribed to all topics + server.send('pub', 'hello') + assert wait_agent_attr(client, name='received', data='hello') + + # Only subscribe to 'TOP' topic + client.unsubscribe_socket_from_topic('sub', b'') + client.subscribe_socket_to_topic('sub', b'TOP') + + # Message not received since 'TOP' topic not specified in the send call + server.send('pub', 'world') + assert not wait_agent_attr(client, name='received', data='world', + timeout=1) + + # Receive message with the topic we are subscribed to + server.send('pub', 'ten', topic='TOP') + assert wait_agent_attr(client, name='received', data='ten') + + @pytest.mark.parametrize('server', [Server, PubServer]) def test_simple_pub_single_handler(nsproxy, server): """ From dda74d5daf705d6715790325e078e6d251e09205 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Mon, 19 Jun 2017 16:02:00 +0200 Subject: [PATCH 2/9] Add topic handling functions to Agent --- osbrain/address.py | 7 ++++--- osbrain/agent.py | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/osbrain/address.py b/osbrain/address.py index b1c73f9..b7b26dd 100644 --- a/osbrain/address.py +++ b/osbrain/address.py @@ -389,7 +389,7 @@ class AgentChannel(): receiver : str Second AgentAddress. """ - def __init__(self, kind, receiver, sender): + def __init__(self, kind, receiver, sender, uuid=None): self.kind = AgentChannelKind(kind) self.receiver = receiver self.sender = sender @@ -397,7 +397,7 @@ def __init__(self, kind, receiver, sender): receiver.transport if receiver else sender.transport self.serializer = \ receiver.serializer if receiver else sender.serializer - self.uuid = unique_identifier() + self.uuid = uuid or unique_identifier() # Set up pairs if sender: self.sender.channel = self @@ -435,4 +435,5 @@ def twin(self): kind = self.kind.twin() sender = self.receiver.twin() if self.receiver is not None else None receiver = self.sender.twin() if self.sender is not None else None - return self.__class__(kind=kind, receiver=receiver, sender=sender) + return self.__class__(kind=kind, receiver=receiver, sender=sender, + uuid=self.uuid) diff --git a/osbrain/agent.py b/osbrain/agent.py index 602b9ab..5bdd117 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -898,6 +898,44 @@ def _connect_and_register(self, client_address, alias=None, handler=None, self.register(socket, register_as, alias, handler) return client_address + def unsubscribe_socket_from_topic(self, socket, topic: bytes): + ''' + Unsubscribe a socket from a given topic. + + Parameters + ---------- + socket + Identifier of the socket. Must be a valid entry of `self.socket` + topic + topic which we want to unsubscribe from + ''' + if isinstance(self.address[socket], AgentAddress): + self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) + if isinstance(self.address[socket], AgentChannel): + channel = self.address[socket] + socket = channel.receiver + treated_topic = channel.uuid + topic + self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) + + def subscribe_socket_to_topic(self, socket, topic: bytes): + ''' + Subscribe a socket to a given topic. + + Parameters + ---------- + socket + Identifier of the socket. Must be a valid entry of `self.socket` + topic + topic which we want to subscribe to + ''' + if isinstance(self.address[socket], AgentAddress): + self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) + if isinstance(self.address[socket], AgentChannel): + channel = self.address[socket] + socket = channel.receiver + treated_topic = channel.uuid + topic + self.socket[socket].setsockopt(zmq.SUBSCRIBE, treated_topic) + def _handle_async_requests(self, data): address_uuid, uuid, response = data if uuid not in self._pending_requests: @@ -932,7 +970,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): curated_handlers = topics_to_bytes(handlers) # Subscribe to topics for topic in curated_handlers.keys(): - self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic) + self.subscribe_socket_to_topic(alias, topic) # Reset handlers self._set_handler(self.socket[alias], curated_handlers) From 985f086a78df1073b7046ce198925af6d3d56edc Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 14:52:04 +0200 Subject: [PATCH 3/9] Rename functions agent --- osbrain/agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 5bdd117..2f7761f 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -898,7 +898,7 @@ def _connect_and_register(self, client_address, alias=None, handler=None, self.register(socket, register_as, alias, handler) return client_address - def unsubscribe_socket_from_topic(self, socket, topic: bytes): + def unsubscribe_from_topic(self, socket, topic: bytes): ''' Unsubscribe a socket from a given topic. @@ -917,7 +917,7 @@ def unsubscribe_socket_from_topic(self, socket, topic: bytes): treated_topic = channel.uuid + topic self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) - def subscribe_socket_to_topic(self, socket, topic: bytes): + def subscribe_to_topic(self, socket, topic: bytes): ''' Subscribe a socket to a given topic. @@ -970,7 +970,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): curated_handlers = topics_to_bytes(handlers) # Subscribe to topics for topic in curated_handlers.keys(): - self.subscribe_socket_to_topic(alias, topic) + self.subscribe_to_topic(alias, topic) # Reset handlers self._set_handler(self.socket[alias], curated_handlers) From d03ec09ac9f4b8f2a50b3dd0836af9fc80bba489 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 14:52:11 +0200 Subject: [PATCH 4/9] Rename function tests --- osbrain/tests/test_agent_sync_publications.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbrain/tests/test_agent_sync_publications.py b/osbrain/tests/test_agent_sync_publications.py index af8aaf3..c8c14ab 100644 --- a/osbrain/tests/test_agent_sync_publications.py +++ b/osbrain/tests/test_agent_sync_publications.py @@ -95,8 +95,8 @@ def test_change_subscription_topics_sync(nsproxy, socket_type): assert wait_agent_attr(client, name='received', data='hello') # Only subscribe to 'TOP' topic - client.unsubscribe_socket_from_topic('sub', b'') - client.subscribe_socket_to_topic('sub', b'TOP') + client.unsubscribe_from_topic('sub', b'') + client.subscribe_to_topic('sub', b'TOP') # Message not received since 'TOP' topic not specified in the send call server.send('pub', 'world') From df993723b46077615982278db501cbd0ab897122 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 15:01:32 +0200 Subject: [PATCH 5/9] Change if to elif --- osbrain/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 2f7761f..61f6eae 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -911,7 +911,7 @@ def unsubscribe_from_topic(self, socket, topic: bytes): ''' if isinstance(self.address[socket], AgentAddress): self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) - if isinstance(self.address[socket], AgentChannel): + elif isinstance(self.address[socket], AgentChannel): channel = self.address[socket] socket = channel.receiver treated_topic = channel.uuid + topic @@ -930,7 +930,7 @@ def subscribe_to_topic(self, socket, topic: bytes): ''' if isinstance(self.address[socket], AgentAddress): self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) - if isinstance(self.address[socket], AgentChannel): + elif isinstance(self.address[socket], AgentChannel): channel = self.address[socket] socket = channel.receiver treated_topic = channel.uuid + topic From 75110ae97e90cdf365539c8e92c9bcf3787b6ce3 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 15:49:58 +0200 Subject: [PATCH 6/9] Improve signature types --- osbrain/agent.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 61f6eae..672064e 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -898,7 +898,9 @@ def _connect_and_register(self, client_address, alias=None, handler=None, self.register(socket, register_as, alias, handler) return client_address - def unsubscribe_from_topic(self, socket, topic: bytes): + def unsubscribe_from_topic(self, + socket: Union[AgentAddress, str, zmq.Socket], + topic: bytes): ''' Unsubscribe a socket from a given topic. @@ -906,6 +908,8 @@ def unsubscribe_from_topic(self, socket, topic: bytes): ---------- socket Identifier of the socket. Must be a valid entry of `self.socket` + for the PUB/SUB pattern and a valid entry of `self.address` for + the SYNC_PUB/SYNC_SUB pattern topic topic which we want to unsubscribe from ''' @@ -917,7 +921,8 @@ def unsubscribe_from_topic(self, socket, topic: bytes): treated_topic = channel.uuid + topic self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) - def subscribe_to_topic(self, socket, topic: bytes): + def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket], + topic: bytes): ''' Subscribe a socket to a given topic. @@ -925,6 +930,8 @@ def subscribe_to_topic(self, socket, topic: bytes): ---------- socket Identifier of the socket. Must be a valid entry of `self.socket` + for the PUB/SUB pattern and a valid entry of `self.address` for + the SYNC_PUB/SYNC_SUB pattern topic topic which we want to subscribe to ''' From 84155170f4957abddb645b96b7361d669b8e0338 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 17:11:46 +0200 Subject: [PATCH 7/9] Improve tests of subscriptions --- osbrain/tests/test_agent_sync_publications.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/osbrain/tests/test_agent_sync_publications.py b/osbrain/tests/test_agent_sync_publications.py index c8c14ab..d4cdf4b 100644 --- a/osbrain/tests/test_agent_sync_publications.py +++ b/osbrain/tests/test_agent_sync_publications.py @@ -74,7 +74,9 @@ def on_error(agent): @pytest.mark.parametrize('socket_type', ['PUB', 'SYNC_PUB']) -def test_change_subscription_topics_sync(nsproxy, socket_type): +@pytest.mark.parametrize('unsub, sub', [('', 'TOP'), + (b'', b'TOP')]) +def test_change_subscription_topics_sync(nsproxy, socket_type, unsub, sub): ''' Test for the different options of subscribing/unsubscribing to topics in the SYNC_PUB/SYNC_SUB pattern. @@ -95,8 +97,8 @@ def test_change_subscription_topics_sync(nsproxy, socket_type): assert wait_agent_attr(client, name='received', data='hello') # Only subscribe to 'TOP' topic - client.unsubscribe_from_topic('sub', b'') - client.subscribe_to_topic('sub', b'TOP') + client.unsubscribe_from_topic('sub', unsub) + client.subscribe_to_topic('sub', sub) # Message not received since 'TOP' topic not specified in the send call server.send('pub', 'world') From d3dc08b81347b8fc2f8dfb5606542dab06af8e3a Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 16:59:53 +0200 Subject: [PATCH 8/9] Accept topics as bytes or str for subscriptions --- osbrain/agent.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 672064e..5d9757c 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -900,7 +900,7 @@ def _connect_and_register(self, client_address, alias=None, handler=None, def unsubscribe_from_topic(self, socket: Union[AgentAddress, str, zmq.Socket], - topic: bytes): + topic: Union[bytes, str]): ''' Unsubscribe a socket from a given topic. @@ -913,6 +913,9 @@ def unsubscribe_from_topic(self, topic topic which we want to unsubscribe from ''' + if isinstance(topic, str): + topic = topic.encode() + if isinstance(self.address[socket], AgentAddress): self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic) elif isinstance(self.address[socket], AgentChannel): @@ -922,7 +925,7 @@ def unsubscribe_from_topic(self, self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket], - topic: bytes): + topic: Union[bytes, str]): ''' Subscribe a socket to a given topic. @@ -935,6 +938,9 @@ def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket], topic topic which we want to subscribe to ''' + if isinstance(topic, str): + topic = topic.encode() + if isinstance(self.address[socket], AgentAddress): self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic) elif isinstance(self.address[socket], AgentChannel): From 938341dedfbed6132209e28d1cdb18a7faf3c115 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 17:03:32 +0200 Subject: [PATCH 9/9] Improve subscription methods signature types --- osbrain/agent.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 5d9757c..cdc2de6 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -899,7 +899,7 @@ def _connect_and_register(self, client_address, alias=None, handler=None, return client_address def unsubscribe_from_topic(self, - socket: Union[AgentAddress, str, zmq.Socket], + socket: Union[AgentAddress, AgentChannel, str], topic: Union[bytes, str]): ''' Unsubscribe a socket from a given topic. @@ -924,7 +924,8 @@ def unsubscribe_from_topic(self, treated_topic = channel.uuid + topic self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic) - def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket], + def subscribe_to_topic(self, + socket: Union[AgentAddress, AgentChannel, str], topic: Union[bytes, str]): ''' Subscribe a socket to a given topic.