Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finer topic handling #143

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions osbrain/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,15 @@ class AgentChannel():
receiver : str
Second AgentAddress.
"""
def __init__(self, kind, receiver, sender):
def __init__(self, kind, receiver, sender, uuid=None):
self.kind = AgentChannelKind(kind)
self.receiver = receiver
self.sender = sender
self.transport = \
receiver.transport if receiver else sender.transport
self.serializer = \
receiver.serializer if receiver else sender.serializer
self.uuid = unique_identifier()
self.uuid = uuid or unique_identifier()
# Set up pairs
if sender:
self.sender.channel = self
Expand Down Expand Up @@ -435,4 +435,5 @@ def twin(self):
kind = self.kind.twin()
sender = self.receiver.twin() if self.receiver is not None else None
receiver = self.sender.twin() if self.sender is not None else None
return self.__class__(kind=kind, receiver=receiver, sender=sender)
return self.__class__(kind=kind, receiver=receiver, sender=sender,
uuid=self.uuid)
47 changes: 46 additions & 1 deletion osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,51 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_from_topic(self,
socket: Union[AgentAddress, str, zmq.Socket],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Union[AgentAddress, AgentChannel, str]? AgentChannel should be there, and I would remove zmq.Socket`, as the user should never be working with raw sockets in osBrain.

topic: bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do allow Union[bytes, str] when making subscriptions (see Agent's _subscribe method). I think we should do the same here as well. User can directly use str (it will automatically be converted to bytes encoded in UTF8, but can use bytes if they prefer to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe this should be considered in the tests as well: test with bytes and test with string).

'''
Unsubscribe a socket from a given topic.

Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
for the PUB/SUB pattern and a valid entry of `self.address` for
the SYNC_PUB/SYNC_SUB pattern
topic
topic which we want to unsubscribe from
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)
elif isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic)

def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Union[AgentAddress, AgentChannel, str]? AgentChannel should be there, and I would remove zmq.Socket`, as the user should never be working with raw sockets in osBrain.

topic: bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do allow Union[bytes, str] when making subscriptions (see Agent's _subscribe method). I think we should do the same here as well. User can directly use str (it will automatically be converted to bytes encoded in UTF8, but can use bytes if they prefer to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe this should be considered in the tests as well: test with bytes and test with string).

'''
Subscribe a socket to a given topic.

Parameters
----------
socket
Identifier of the socket. Must be a valid entry of `self.socket`
for the PUB/SUB pattern and a valid entry of `self.address` for
the SYNC_PUB/SYNC_SUB pattern
topic
topic which we want to subscribe to
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)
elif isinstance(self.address[socket], AgentChannel):
channel = self.address[socket]
socket = channel.receiver
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.SUBSCRIBE, treated_topic)

def _handle_async_requests(self, data):
address_uuid, uuid, response = data
if uuid not in self._pending_requests:
Expand Down Expand Up @@ -932,7 +977,7 @@ def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
curated_handlers = topics_to_bytes(handlers)
# Subscribe to topics
for topic in curated_handlers.keys():
self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
self.subscribe_to_topic(alias, topic)
# Reset handlers
self._set_handler(self.socket[alias], curated_handlers)

Expand Down
35 changes: 35 additions & 0 deletions osbrain/tests/test_agent_sync_publications.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@ def on_error(agent):
agent.error_log.append('error')


@pytest.mark.parametrize('socket_type', ['PUB', 'SYNC_PUB'])
def test_change_subscription_topics_sync(nsproxy, socket_type):
'''
Test for the different options of subscribing/unsubscribing to topics
in the SYNC_PUB/SYNC_SUB pattern.
'''
server = run_agent('server')
client = run_agent('client')

addr = server.bind(socket_type, alias='pub', handler=lambda: None)
client.set_attr(received=[])
client.connect(addr, alias='sub', handler=append_received)

# Stablish a connection
server.each(0.1, 'send', 'pub', 'connecting...')
assert wait_agent_attr(client, name='received', data='connecting...')

# By default, client will be subscribed to all topics
server.send('pub', 'hello')
assert wait_agent_attr(client, name='received', data='hello')

# Only subscribe to 'TOP' topic
client.unsubscribe_from_topic('sub', b'')
client.subscribe_to_topic('sub', b'TOP')

# Message not received since 'TOP' topic not specified in the send call
server.send('pub', 'world')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again we find synchronization problems here.

What do you think about this? We create a function only for tests sync_pub_sub_append_received (for example). This function takes a publisher and a subscriber. Assumes the handler in the subscriber is the classic append_received (which we use most of the time). Starts publishing messages like increasing natural numbers (in a for loop, not using a timer, we can set a small sleep here). We wait for the first one to be appended in the client. Then we no longer send messages from the publisher. We wait for the client to receive the last number sent from the publisher (once the first is received, all messages after that one should also be received, and we know which number we sent last from the publisher). Then PUB-SUB are definitely connected and in sync. Now we reset received in the client and there we go!

What you think?

(This would apply for other tests we have around there, until we implement heart-beating or some other fancy stuff).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea, however I would still need to set a handler other than append_received once the connection is guaranteed to be ok. Perhaps we could do the same that we did in #151 with subscribe or some similar implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm just throwing random ideas... 😂

assert not wait_agent_attr(client, name='received', data='world',
timeout=1)

# Receive message with the topic we are subscribed to
server.send('pub', 'ten', topic='TOP')
assert wait_agent_attr(client, name='received', data='ten')


@pytest.mark.parametrize('server', [Server, PubServer])
def test_simple_pub_single_handler(nsproxy, server):
"""
Expand Down