Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function to guarantee PUB/SUB channel in SYNC_PUB #151

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
33 changes: 30 additions & 3 deletions osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .address import AgentAddressKind
from .address import AgentAddressSerializer
from .address import AgentChannel
from .address import AgentChannelKind
from .address import address_to_host_port
from .address import guess_kind
from .proxy import Proxy
Expand Down Expand Up @@ -543,6 +544,15 @@ def register(self, socket, address, alias=None, handler=None):
self.poller.register(socket, zmq.POLLIN)
self._set_handler(socket, handler)

def get_handler(self, alias):
"""
Get the handler associated to a socket given the socket alias.

Ideally, this should only be called for alias that represent a
SUB socket.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring Parameters and Returns sections are missing. 😉

return self.handler[self.socket[alias]]

def _set_handler(self, socket, handlers):
"""
Set the socket handler(s).
Expand Down Expand Up @@ -647,7 +657,7 @@ def _bind_address(self, kind, alias=None, handler=None, addr=None,
self.register(socket, server_address, alias, handler)
# SUB sockets are a special case
if kind == 'SUB':
self._subscribe(server_address, handler)
self.subscribe(server_address, handler)
return server_address

def _bind_channel(self, kind, alias=None, handler=None, addr=None,
Expand Down Expand Up @@ -783,7 +793,7 @@ def _connect_address(self, server_address, alias=None, handler=None):
if client_address.kind == 'SUB':
if not alias:
alias = client_address
self._subscribe(alias, handler)
self.subscribe(alias, handler)
return client_address

def _connect_channel(self, channel, alias=None, handler=None):
Expand Down Expand Up @@ -912,7 +922,7 @@ def _handle_async_requests(self, data):
else:
handler(self, response)

def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
def subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
"""
Subscribe the agent to another agent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to complete de docstring here as well now that the method is "user accessible".

Expand Down Expand Up @@ -965,6 +975,9 @@ def set_attr(self, **kwargs):
def get_attr(self, name):
return getattr(self, name)

def del_attr(self, name):
delattr(self, name)

def set_method(self, *args, **kwargs):
"""
Set object methods.
Expand Down Expand Up @@ -1508,6 +1521,20 @@ def close_sockets(self):
for sock in self.get_unique_external_zmq_sockets():
sock.close(linger=get_linger())

def get_uuid_used_as_alias_for_sub_in_sync_pub(self, client_alias):
"""
Return the uuid that was used as the alias for the SUB socket
when a connection to a SYNC_PUB channel was made.
"""
channel = self.addr(client_alias)
if channel.kind != AgentChannelKind('SYNC_SUB'):
raise ValueError('Incorrect channel kind: {}'.format(channel.kind))
client_addr = channel.twin().sender.twin()
addr_to_access_uuid = self.addr(client_addr)
uuid = self._async_req_uuid[addr_to_access_uuid]

return uuid

def ping(self):
"""
A test method to check the readiness of the agent. Used for testing
Expand Down
33 changes: 33 additions & 0 deletions osbrain/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,36 @@ def wait_agent_attr(agent, name='received', length=None, data=None, value=None,
break
time.sleep(0.01)
return False


def synchronize_sync_pub(server, server_alias, client, client_alias):
'''
Create a SYNC_PUB/SYNC_SUB channel and connect both agents.

Make sure they have stablished the PUB/SUB communication within the
SYNC_PUB/SYNC_SUB channel before returning. This will guarantee that
no PUB messages are lost.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing "Parameters" section from the docstring.

'''
def assert_receive(agent, message, topic=None):
try:
agent.get_attr('_tmp_attr')
agent.set_attr(_tmp_attr=True)
except AttributeError: # Attribute already deleted
pass

uuid = client.get_uuid_used_as_alias_for_sub_in_sync_pub(client_alias)

# Set a temporary custom handler
client.set_attr(_tmp_attr=False)
original_handler = client.get_handler(uuid)
client.subscribe(uuid, assert_receive)

# Send messages through the PUB socket until the client receives them
server.each(0.1, 'send', server_alias, 'Synchronize', alias='_tmp_timer')
assert wait_agent_attr(client, name='_tmp_attr', value=True, timeout=5)
server.stop_timer('_tmp_timer')

# Restore the original handler, now that the connection is guaranteed
client.subscribe(uuid, original_handler)

client.del_attr('_tmp_attr')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this implementation. Worst-case there might messages not received by the client when you change the handler. That is really bad. Also, that requires the .subscribe() call to be executed again, which I'm not sure is the best thing either.

Would it make sense to ensure synchronization with SYNC-PUB requests? i.e.:

  • First connect from the subscriber.
  • Then make requests from the subscriber each X seconds until one response is received.
  • That is all.

You achieve synchronization while avoiding double .subcribe() call and avoid risks of the SUB handler receiving messages sent for the "syncrhonization" process.

You may want to specify an on_error=lambda x: pass when sending the request to avoid warning log messages during synchronization.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems interesting, I will take a look into it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would not be possible to synchronize them through requests. I am thinking about the case in which a request might change the internal state of the SYNC_PUB agent, and for those cases, an extra hack around would have to be made, I think (changing the handler of the SYNC_PUB momentarily for the requests).

Perhaps what we could do is explain that some messages might be lost in the process or that it should only be used for testing purposes. Now I'm thinking that perhaps the function does not even belong there. In the end, I came up with it to make some tests more consistent.

Any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm.

Yeah, one possibility would be to take that away from there and keep it just for the tests.

Another possibility would be to make all SYNC_PUB sockets implement heart-beating. This is something we should do at some point anyway, hopefully Soon™.

Heart-beating would mean that a SYNC_PUB socket would publish a "heart-beat" (like an "I'm alive" message) periodically, to all subscribed agents. Clients can use this to know if they should reconnect (maybe the server went down).

What we could do for now is to implement just a part of the heart-beating: allow clients to request a heart-beat at any time (i.e.: "are you alive?"). This would be a request from the SYNC_SUB, although it would be treated as a special case and all SYNC_PUB channels should handle it the same way, without taking into account the user-defined handler for other requests. This means adding a "special case" in the protocol, but I don't think it is bad, as requests do not need to be very fast and are infrequent.

We could then use those heart-beat requests from the tests to ensure synchronization.

If you feel like that is too much for now, just take it away from there and open a new issue so we try to improve it for version 1.0.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe heart-beat messages could be published with a special topic (so that clients may decide to subscribe or not to this heart-beat).

Maybe we could use that for now for the tests: from the publisher, we add a timer that publishes heart-beats with a special topic, from the subscriber, when we want to make sure we are fully connected, we just subscribe to this topic and wait for the first heart-beat.

What you think?

Maybe we should anyway open an issue and move this discussion there (we're probably going to discuss these things again when we start working on a good heart-beat implementation).

54 changes: 54 additions & 0 deletions osbrain/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,60 @@ def test_invalid_handlers(nsproxy):
agent.bind('REP', handler=1.234)


def test_get_handler(nsproxy):
"""
Make sure the actual handler is returned.
"""
server = run_agent('server')
client = run_agent('client')

pub_addr = server.bind('PUB', alias='pub')
client.connect(pub_addr, alias='sub', handler=receive)

assert client.get_handler('sub')
with pytest.raises(KeyError):
server.get_handler('pub')


def test_get_uuid_used_as_alias_for_sub_in_sync_pub_sync(nsproxy):
"""
The function should only work for SYNC_SUB channels, and should raise
an exception in any other case.
"""
server = run_agent('server')
client = run_agent('client')

sync_pub_addr = server.bind('SYNC_PUB', alias='sync_pub', handler=receive)
client.connect(sync_pub_addr, alias='sync_sub', handler=receive)

# Should work for SYNC_SUB channels
assert client.get_uuid_used_as_alias_for_sub_in_sync_pub('sync_sub')

# Should not work for other channels
with pytest.raises(ValueError):
server.get_uuid_used_as_alias_for_sub_in_sync_pub('sync_pub')


def test_get_uuid_used_as_alias_for_sub_in_sync_pub_async(nsproxy):
"""
The function should only work for SYNC_SUB channels, and should raise
an exception in any other case.
"""
server = run_agent('server')
client = run_agent('client')

async_rep_addr = server.bind('ASYNC_REP', alias='async_rep',
handler=receive)
client.connect(async_rep_addr, alias='async_req', handler=receive)

# Should not work for either channel
with pytest.raises(ValueError):
client.get_uuid_used_as_alias_for_sub_in_sync_pub('async_req')

with pytest.raises(ValueError):
server.get_uuid_used_as_alias_for_sub_in_sync_pub('async_rep')


def test_log_levels(nsproxy):
"""
Test different log levels: info, warning, error and debug. Debug messages
Expand Down
39 changes: 39 additions & 0 deletions osbrain/tests/test_agent_pubsub_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from osbrain import run_agent
from osbrain.address import AgentAddressSerializer
from osbrain.helper import wait_agent_attr

from common import nsproxy # pragma: no flakes
from common import append_received
Expand Down Expand Up @@ -164,3 +165,41 @@ def test_pubsub_topics_raw(nsproxy, serializer):
assert b'fooWorld' in a5.get_attr('received')
assert b'foobarFOO' in a5.get_attr('received')
assert b'foBAR' in a5.get_attr('received')


def test_subscribe(nsproxy):
"""
Test the `subscribe` function works as expected for SUB sockets.
"""
def receive_square(agent, message, topic=None):
agent.received.append(message**2)

def receive_cube(agent, message, topic=None):
agent.received.append(message**3)

server = run_agent('server')
client = run_agent('client')

addr = server.bind('PUB', alias='pub')
client.set_attr(received=[])
client.connect(addr, alias='sub', handler=receive)

# Give some time for the client to connect
time.sleep(0.1)

server.send('pub', 2)
assert wait_agent_attr(client, data=2)

client.subscribe('sub', handlers={'foo': receive_square,
'bar': receive_cube})

server.send('pub', 2, topic='foo')
server.send('pub', 2, topic='bar')
server.send('pub', 3)

# Check new handlers were used for different topics
assert wait_agent_attr(client, data=4)
assert wait_agent_attr(client, data=8)

# No longer subscribed to all topics
assert not wait_agent_attr(client, data=3)
32 changes: 32 additions & 0 deletions osbrain/tests/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,41 @@
from osbrain.helper import agent_dies
from osbrain.helper import attribute_match_all
from osbrain.helper import wait_agent_attr
from osbrain.helper import synchronize_sync_pub

from common import nsproxy # pragma: no flakes
from common import agent_logger # pragma: no flakes
from common import append_received


def test_synchronize_sync_pub(nsproxy):
"""
Publications sent through SYNC_PUB/SYNC_SUB after synchronazing them
should be received without exception.
"""
server = run_agent('server')
client = run_agent('client')

addr = server.bind('SYNC_PUB', alias='sync_pub', handler=append_received)

client.set_attr(received=[])
client.connect(addr, alias='sync_sub', handler=append_received)

# Guarantee the PUB/SUB is stablished
synchronize_sync_pub(server, 'sync_pub', client, 'sync_sub')

# Send the message only once
server.send('sync_pub', 'Hello')

assert wait_agent_attr(client, name='received', data='Hello')

# Check that no temporary attributes remain
assert 'Synchronize' not in client.get_attr('received')

with pytest.raises(AttributeError):
client.get_attr('_tmp_attr')

assert '_tmp_timer' not in server.list_timers()


def test_agent_dies(nsproxy):
Expand Down