From 5a092be3fb43ee84afe45f1b6ff74566817b391b Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 11:18:00 +0200 Subject: [PATCH 1/9] Function to guarantee PUB/SUB channel in SYNC_PUB This function will guarantee the PUB/SUB channel is already up and running after returning, ready for sending data right away. --- osbrain/agent.py | 6 +++++ osbrain/helper.py | 43 ++++++++++++++++++++++++++++++++++++ osbrain/tests/test_helper.py | 33 +++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/osbrain/agent.py b/osbrain/agent.py index 602b9ab..ee7216e 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -912,6 +912,9 @@ def _handle_async_requests(self, data): else: handler(self, response) + def ugly(self, alias, handlers): + self._subscribe(alias, handlers) + def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): """ Subscribe the agent to another agent. @@ -965,6 +968,9 @@ def set_attr(self, **kwargs): def get_attr(self, name): return getattr(self, name) + def del_attr(self, name): + delattr(self, name) + def set_method(self, *args, **kwargs): """ Set object methods. diff --git a/osbrain/helper.py b/osbrain/helper.py index 7b0a289..d112ac8 100644 --- a/osbrain/helper.py +++ b/osbrain/helper.py @@ -192,3 +192,46 @@ def wait_agent_attr(agent, name='received', length=None, data=None, value=None, break time.sleep(0.01) return False + + +def synchronize_sync_pub(server, server_alias, server_handler, client, + client_alias, client_handler): + ''' + Create a SYNC_PUB/SYNC_SUB channel and connect both agents. + + Make sure they have stablished the PUB/SUB communication within the + SYNC_PUB/SYNC_SUB channel before returning. This will guarantee that + no PUB messages are lost. + ''' + def assert_receive(agent, message, topic=None): + try: + agent.get_attr('_tmp_attr') + agent._tmp_attr = True + except AttributeError: # Attribute already deleted + pass + + addr = server.bind('SYNC_PUB', alias=server_alias, handler=server_handler) + + client.set_attr(_tmp_attr=False) + + client.connect(addr, alias=client_alias, handler=assert_receive) + + # Send messages through the PUB socket until the client receives them + server.each(0.1, 'send', addr, 'Synchronize', alias='_tmp_timer') + + assert wait_agent_attr(client, name='_tmp_attr', value=True, timeout=5) + + server.stop_timer('_tmp_timer') + + # The following is an ugly hack to get the uuid used as the alias for the + # SUB socket of the client in the SYNC_PUB channel. + channel = client.addr(client_alias) + client_addr = channel.twin().sender.twin() + addr_to_access_uuid = client.addr(client_addr) + client_async_req_uuid = client.get_attr('_async_req_uuid') + uuid = client_async_req_uuid[addr_to_access_uuid] + + # Set the handler passed as a parameter, now that connection is guaranteed + client.ugly(uuid, client_handler) + + client.del_attr('_tmp_attr') diff --git a/osbrain/tests/test_helper.py b/osbrain/tests/test_helper.py index 6565672..b8f41a6 100644 --- a/osbrain/tests/test_helper.py +++ b/osbrain/tests/test_helper.py @@ -9,11 +9,44 @@ from osbrain.helper import agent_dies from osbrain.helper import attribute_match_all from osbrain.helper import wait_agent_attr +from osbrain.helper import synchronize_sync_pub from common import nsproxy # pragma: no flakes from common import agent_logger # pragma: no flakes +def receive(agent, response): + agent.received.append(response) + + +def test_synchronize_sync_pub(nsproxy): + """ + All publications in SYNC_PUB/SYNC_SUB connections stablished through + `synchronize_sync_pub` should be received in the other end. + """ + server = run_agent('server') + client = run_agent('client') + + client.set_attr(received=[]) + + # Create a SYNC_PUB channel and guarantee the PUB/SUB is stablished + synchronize_sync_pub(server, 'sync_pub', receive, + client, 'sync_sub', receive) + + # Send the message only once + server.send('sync_pub', 'Hello') + + assert wait_agent_attr(client, name='received', data='Hello') + + # Check that no temporary attributes remain + assert 'Synchronize' not in client.get_attr('received') + + with pytest.raises(AttributeError): + client.get_attr('_tmp_attr') + + assert '_tmp_timer' not in server.list_timers() + + def test_agent_dies(nsproxy): """ The function `agent_dies` should return `False` if the agent does not die From e2e812ff8ded2ed81db397a8f08db5284e1ac042 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 16:24:24 +0200 Subject: [PATCH 2/9] Creation of the sockets moved out synchronize_sync_pub --- osbrain/agent.py | 6 ++++++ osbrain/helper.py | 29 +++++++++++++---------------- osbrain/tests/test_helper.py | 5 ++++- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index ee7216e..ce24cc7 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -543,6 +543,12 @@ def register(self, socket, address, alias=None, handler=None): self.poller.register(socket, zmq.POLLIN) self._set_handler(socket, handler) + def UGLY_get_handler(self, alias): + """ + This should be private. + """ + return self.handler[self.socket[alias]] + def _set_handler(self, socket, handlers): """ Set the socket handler(s). diff --git a/osbrain/helper.py b/osbrain/helper.py index d112ac8..2f4431c 100644 --- a/osbrain/helper.py +++ b/osbrain/helper.py @@ -206,23 +206,10 @@ def synchronize_sync_pub(server, server_alias, server_handler, client, def assert_receive(agent, message, topic=None): try: agent.get_attr('_tmp_attr') - agent._tmp_attr = True + agent.set_attr(_tmp_attr=True) except AttributeError: # Attribute already deleted pass - addr = server.bind('SYNC_PUB', alias=server_alias, handler=server_handler) - - client.set_attr(_tmp_attr=False) - - client.connect(addr, alias=client_alias, handler=assert_receive) - - # Send messages through the PUB socket until the client receives them - server.each(0.1, 'send', addr, 'Synchronize', alias='_tmp_timer') - - assert wait_agent_attr(client, name='_tmp_attr', value=True, timeout=5) - - server.stop_timer('_tmp_timer') - # The following is an ugly hack to get the uuid used as the alias for the # SUB socket of the client in the SYNC_PUB channel. channel = client.addr(client_alias) @@ -231,7 +218,17 @@ def assert_receive(agent, message, topic=None): client_async_req_uuid = client.get_attr('_async_req_uuid') uuid = client_async_req_uuid[addr_to_access_uuid] - # Set the handler passed as a parameter, now that connection is guaranteed - client.ugly(uuid, client_handler) + # Set a temporary custom handler + client.set_attr(_tmp_attr=False) + original_handler = client.UGLY_get_handler(uuid) + client.ugly(uuid, assert_receive) + + # Send messages through the PUB socket until the client receives them + server.each(0.1, 'send', server_alias, 'Synchronize', alias='_tmp_timer') + assert wait_agent_attr(client, name='_tmp_attr', value=True, timeout=5) + server.stop_timer('_tmp_timer') + + # Restore the original handler, now that the connection is guaranteed + client.ugly(uuid, original_handler) client.del_attr('_tmp_attr') diff --git a/osbrain/tests/test_helper.py b/osbrain/tests/test_helper.py index b8f41a6..b110b5b 100644 --- a/osbrain/tests/test_helper.py +++ b/osbrain/tests/test_helper.py @@ -27,9 +27,12 @@ def test_synchronize_sync_pub(nsproxy): server = run_agent('server') client = run_agent('client') + addr = server.bind('SYNC_PUB', alias='sync_pub', handler=receive) + client.set_attr(received=[]) + client.connect(addr, alias='sync_sub', handler=receive) - # Create a SYNC_PUB channel and guarantee the PUB/SUB is stablished + # Guarantee the PUB/SUB is stablished synchronize_sync_pub(server, 'sync_pub', receive, client, 'sync_sub', receive) From 393087de0f55117a1ddd340ee5a258cb0aa0aba0 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 16:31:00 +0200 Subject: [PATCH 3/9] Move get uuid hack into a function of Agent --- osbrain/agent.py | 12 ++++++++++++ osbrain/helper.py | 8 +------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index ce24cc7..18efc26 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -1520,6 +1520,18 @@ def close_sockets(self): for sock in self.get_unique_external_zmq_sockets(): sock.close(linger=get_linger()) + def get_uuid_used_as_alias_for_sub_in_sync_pub(self, client_alias): + """ + Return the uuid that was used as the alias for the SUB socket + when a connection to a SYNC_PUB channel was made. + """ + channel = self.addr(client_alias) + client_addr = channel.twin().sender.twin() + addr_to_access_uuid = self.addr(client_addr) + uuid = self._async_req_uuid[addr_to_access_uuid] + + return uuid + def ping(self): """ A test method to check the readiness of the agent. Used for testing diff --git a/osbrain/helper.py b/osbrain/helper.py index 2f4431c..ff8868c 100644 --- a/osbrain/helper.py +++ b/osbrain/helper.py @@ -210,13 +210,7 @@ def assert_receive(agent, message, topic=None): except AttributeError: # Attribute already deleted pass - # The following is an ugly hack to get the uuid used as the alias for the - # SUB socket of the client in the SYNC_PUB channel. - channel = client.addr(client_alias) - client_addr = channel.twin().sender.twin() - addr_to_access_uuid = client.addr(client_addr) - client_async_req_uuid = client.get_attr('_async_req_uuid') - uuid = client_async_req_uuid[addr_to_access_uuid] + uuid = client.get_uuid_used_as_alias_for_sub_in_sync_pub(client_alias) # Set a temporary custom handler client.set_attr(_tmp_attr=False) From 85fa7c6dce44e7d6047b733abb2531eef5f26b98 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 17:14:04 +0200 Subject: [PATCH 4/9] Add tests for the uuid in SYNC_SUB sockets. Read below However, there is a chance that the trace of the exception is printed for the test of ASYNC_REP (line 446 in test_agent). This is because we are doing something wrong, probably the connection has no time to be stablished before calling the function or something. --- osbrain/agent.py | 3 +++ osbrain/tests/test_agent.py | 39 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/osbrain/agent.py b/osbrain/agent.py index 18efc26..24f2287 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -36,6 +36,7 @@ from .address import AgentAddressKind from .address import AgentAddressSerializer from .address import AgentChannel +from .address import AgentChannelKind from .address import address_to_host_port from .address import guess_kind from .proxy import Proxy @@ -1526,6 +1527,8 @@ def get_uuid_used_as_alias_for_sub_in_sync_pub(self, client_alias): when a connection to a SYNC_PUB channel was made. """ channel = self.addr(client_alias) + if channel.kind != AgentChannelKind('SYNC_SUB'): + raise ValueError('Incorrect channel kind: {}'.format(channel.kind)) client_addr = channel.twin().sender.twin() addr_to_access_uuid = self.addr(client_addr) uuid = self._async_req_uuid[addr_to_access_uuid] diff --git a/osbrain/tests/test_agent.py b/osbrain/tests/test_agent.py index 17ab67e..076c3a4 100644 --- a/osbrain/tests/test_agent.py +++ b/osbrain/tests/test_agent.py @@ -421,6 +421,45 @@ def test_invalid_handlers(nsproxy): agent.bind('REP', handler=1.234) +def test_get_uuid_used_as_alias_for_sub_in_sync_pub_sync(nsproxy): + """ + The function should only work for SYNC_SUB channels, and should raise + an exception in any other case. + """ + server = run_agent('server') + client = run_agent('client') + + sync_pub_addr = server.bind('SYNC_PUB', alias='sync_pub', handler=receive) + client.connect(sync_pub_addr, alias='sync_sub', handler=receive) + + # Should work for SYNC_SUB channels + assert client.get_uuid_used_as_alias_for_sub_in_sync_pub('sync_sub') + + # Should not work for other channels + with pytest.raises(ValueError): + server.get_uuid_used_as_alias_for_sub_in_sync_pub('sync_pub') + + +def test_get_uuid_used_as_alias_for_sub_in_sync_pub_async(nsproxy): + """ + The function should only work for SYNC_SUB channels, and should raise + an exception in any other case. + """ + server = run_agent('server') + client = run_agent('client') + + async_rep_addr = server.bind('ASYNC_REP', alias='async_rep', + handler=receive) + client.connect(async_rep_addr, alias='async_req', handler=receive) + + # Should not work for either channel + with pytest.raises(ValueError): + client.get_uuid_used_as_alias_for_sub_in_sync_pub('async_req') + + with pytest.raises(ValueError): + server.get_uuid_used_as_alias_for_sub_in_sync_pub('async_rep') + + def test_log_levels(nsproxy): """ Test different log levels: info, warning, error and debug. Debug messages From 0834e3425e8d73655cf082eb12f2b23008878e62 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 17:33:30 +0200 Subject: [PATCH 5/9] Remove unused parameters and improve docstrings --- osbrain/helper.py | 3 +-- osbrain/tests/test_helper.py | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/osbrain/helper.py b/osbrain/helper.py index ff8868c..7d38484 100644 --- a/osbrain/helper.py +++ b/osbrain/helper.py @@ -194,8 +194,7 @@ def wait_agent_attr(agent, name='received', length=None, data=None, value=None, return False -def synchronize_sync_pub(server, server_alias, server_handler, client, - client_alias, client_handler): +def synchronize_sync_pub(server, server_alias, client, client_alias): ''' Create a SYNC_PUB/SYNC_SUB channel and connect both agents. diff --git a/osbrain/tests/test_helper.py b/osbrain/tests/test_helper.py index b110b5b..1284349 100644 --- a/osbrain/tests/test_helper.py +++ b/osbrain/tests/test_helper.py @@ -21,8 +21,8 @@ def receive(agent, response): def test_synchronize_sync_pub(nsproxy): """ - All publications in SYNC_PUB/SYNC_SUB connections stablished through - `synchronize_sync_pub` should be received in the other end. + Publications sent through SYNC_PUB/SYNC_SUB after synchronazing them + should be received without exception. """ server = run_agent('server') client = run_agent('client') @@ -33,8 +33,7 @@ def test_synchronize_sync_pub(nsproxy): client.connect(addr, alias='sync_sub', handler=receive) # Guarantee the PUB/SUB is stablished - synchronize_sync_pub(server, 'sync_pub', receive, - client, 'sync_sub', receive) + synchronize_sync_pub(server, 'sync_pub', client, 'sync_sub') # Send the message only once server.send('sync_pub', 'Hello') From 35ef0df6f3a406a352874e12295f66e784e8fedb Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Tue, 27 Jun 2017 17:38:46 +0200 Subject: [PATCH 6/9] Make subscribe and get_handler non-private --- osbrain/agent.py | 13 +++++-------- osbrain/helper.py | 6 +++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/osbrain/agent.py b/osbrain/agent.py index 24f2287..b2af61f 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -544,9 +544,9 @@ def register(self, socket, address, alias=None, handler=None): self.poller.register(socket, zmq.POLLIN) self._set_handler(socket, handler) - def UGLY_get_handler(self, alias): + def get_handler(self, alias): """ - This should be private. + Get the handler associated to a socket given the socket alias. """ return self.handler[self.socket[alias]] @@ -654,7 +654,7 @@ def _bind_address(self, kind, alias=None, handler=None, addr=None, self.register(socket, server_address, alias, handler) # SUB sockets are a special case if kind == 'SUB': - self._subscribe(server_address, handler) + self.subscribe(server_address, handler) return server_address def _bind_channel(self, kind, alias=None, handler=None, addr=None, @@ -790,7 +790,7 @@ def _connect_address(self, server_address, alias=None, handler=None): if client_address.kind == 'SUB': if not alias: alias = client_address - self._subscribe(alias, handler) + self.subscribe(alias, handler) return client_address def _connect_channel(self, channel, alias=None, handler=None): @@ -919,10 +919,7 @@ def _handle_async_requests(self, data): else: handler(self, response) - def ugly(self, alias, handlers): - self._subscribe(alias, handlers) - - def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): + def subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]): """ Subscribe the agent to another agent. diff --git a/osbrain/helper.py b/osbrain/helper.py index 7d38484..2fee21a 100644 --- a/osbrain/helper.py +++ b/osbrain/helper.py @@ -213,8 +213,8 @@ def assert_receive(agent, message, topic=None): # Set a temporary custom handler client.set_attr(_tmp_attr=False) - original_handler = client.UGLY_get_handler(uuid) - client.ugly(uuid, assert_receive) + original_handler = client.get_handler(uuid) + client.subscribe(uuid, assert_receive) # Send messages through the PUB socket until the client receives them server.each(0.1, 'send', server_alias, 'Synchronize', alias='_tmp_timer') @@ -222,6 +222,6 @@ def assert_receive(agent, message, topic=None): server.stop_timer('_tmp_timer') # Restore the original handler, now that the connection is guaranteed - client.ugly(uuid, original_handler) + client.subscribe(uuid, original_handler) client.del_attr('_tmp_attr') From 20e908ade40f883a43240235a3a6f7d6e264a650 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Wed, 28 Jun 2017 07:38:36 +0200 Subject: [PATCH 7/9] Add test for Agent.subscribe --- osbrain/tests/test_agent_pubsub_topics.py | 39 +++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/osbrain/tests/test_agent_pubsub_topics.py b/osbrain/tests/test_agent_pubsub_topics.py index c122c5f..5273e2c 100644 --- a/osbrain/tests/test_agent_pubsub_topics.py +++ b/osbrain/tests/test_agent_pubsub_topics.py @@ -4,6 +4,7 @@ from osbrain import run_agent from osbrain.address import AgentAddressSerializer +from osbrain.helper import wait_agent_attr from common import nsproxy # pragma: no flakes from common import append_received @@ -164,3 +165,41 @@ def test_pubsub_topics_raw(nsproxy, serializer): assert b'fooWorld' in a5.get_attr('received') assert b'foobarFOO' in a5.get_attr('received') assert b'foBAR' in a5.get_attr('received') + + +def test_subscribe(nsproxy): + """ + Test the `subscribe` function works as expected for SUB sockets. + """ + def receive_square(agent, message, topic=None): + agent.received.append(message**2) + + def receive_cube(agent, message, topic=None): + agent.received.append(message**3) + + server = run_agent('server') + client = run_agent('client') + + addr = server.bind('PUB', alias='pub') + client.set_attr(received=[]) + client.connect(addr, alias='sub', handler=receive) + + # Give some time for the client to connect + time.sleep(0.1) + + server.send('pub', 2) + assert wait_agent_attr(client, data=2) + + client.subscribe('sub', handlers={'foo': receive_square, + 'bar': receive_cube}) + + server.send('pub', 2, topic='foo') + server.send('pub', 2, topic='bar') + server.send('pub', 3) + + # Check new handlers were used for different topics + assert wait_agent_attr(client, data=4) + assert wait_agent_attr(client, data=8) + + # No longer subscribed to all topics + assert not wait_agent_attr(client, data=3) From 747e57b2ba291b28044247afab76f64aaa42f94d Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Wed, 28 Jun 2017 10:59:35 +0200 Subject: [PATCH 8/9] Add test for get_handler and improve method docstring --- osbrain/agent.py | 3 +++ osbrain/tests/test_agent.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/osbrain/agent.py b/osbrain/agent.py index b2af61f..16cdefe 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -547,6 +547,9 @@ def register(self, socket, address, alias=None, handler=None): def get_handler(self, alias): """ Get the handler associated to a socket given the socket alias. + + Ideally, this should only be called for alias that represent a + SUB socket. """ return self.handler[self.socket[alias]] diff --git a/osbrain/tests/test_agent.py b/osbrain/tests/test_agent.py index 076c3a4..f69ddd7 100644 --- a/osbrain/tests/test_agent.py +++ b/osbrain/tests/test_agent.py @@ -421,6 +421,21 @@ def test_invalid_handlers(nsproxy): agent.bind('REP', handler=1.234) +def test_get_handler(nsproxy): + """ + Make sure the actual handler is returned. + """ + server = run_agent('server') + client = run_agent('client') + + pub_addr = server.bind('PUB', alias='pub') + client.connect(pub_addr, alias='sub', handler=receive) + + assert client.get_handler('sub') + with pytest.raises(KeyError): + server.get_handler('pub') + + def test_get_uuid_used_as_alias_for_sub_in_sync_pub_sync(nsproxy): """ The function should only work for SYNC_SUB channels, and should raise From 97d204e257225bee3d51c7d3c04b43752efd0d36 Mon Sep 17 00:00:00 2001 From: Guillermo Alonso Date: Thu, 29 Jun 2017 12:23:43 +0200 Subject: [PATCH 9/9] USe append-receivde --- osbrain/tests/test_helper.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/osbrain/tests/test_helper.py b/osbrain/tests/test_helper.py index 1284349..39328a8 100644 --- a/osbrain/tests/test_helper.py +++ b/osbrain/tests/test_helper.py @@ -13,10 +13,7 @@ from common import nsproxy # pragma: no flakes from common import agent_logger # pragma: no flakes - - -def receive(agent, response): - agent.received.append(response) +from common import append_received def test_synchronize_sync_pub(nsproxy): @@ -27,10 +24,10 @@ def test_synchronize_sync_pub(nsproxy): server = run_agent('server') client = run_agent('client') - addr = server.bind('SYNC_PUB', alias='sync_pub', handler=receive) + addr = server.bind('SYNC_PUB', alias='sync_pub', handler=append_received) client.set_attr(received=[]) - client.connect(addr, alias='sync_sub', handler=receive) + client.connect(addr, alias='sync_sub', handler=append_received) # Guarantee the PUB/SUB is stablished synchronize_sync_pub(server, 'sync_pub', client, 'sync_sub')