Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finer topic handling #143

Closed

Conversation

Flood1993
Copy link
Member

Use case: In osMarkets, we want to be able to subscribe to different topics after the connect call has been done by a client. Therefore, it will come in handy to be able to easily modify the topics a given SUB socket is subscribed to.

Note that this first implementation is only for direct PUB/SUB topics. I think for SYNC_PUB/SYNC_SUB channels, it will need a bit more work, but the idea would be the same. I will start working on the tests for that case right now, and will push to this PR when they are ready.

@codecov
Copy link

codecov bot commented Jun 19, 2017

Codecov Report

Merging #143 into master will decrease coverage by 0.01%.
The diff coverage is 97.22%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master    #143      +/-   ##
=========================================
- Coverage   98.31%   98.3%   -0.02%     
=========================================
  Files          25      25              
  Lines        3030    3062      +32     
  Branches      232     237       +5     
=========================================
+ Hits         2979    3010      +31     
  Misses         38      38              
- Partials       13      14       +1
Impacted Files Coverage Δ
osbrain/agent.py 95.97% <100%> (+0.09%) ⬆️
osbrain/address.py 100% <100%> (ø) ⬆️
osbrain/tests/test_agent_sync_publications.py 99.41% <93.75%> (-0.59%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9cd6f45...75110ae. Read the comment docs.

@Flood1993
Copy link
Member Author

Related to fixing the problem with the SYNC_SUB subscriptions: The cause was that, subscriptions were working before only because they were being done at connect(...) time, in which the channel was passed as a parameter, and therefore, we could access its uuid for prepending it to our topics (since messages sent from the SYNC_PUB socket append the channel uuid always, as a filtering measure).

However, when we wanted to unsubscribe from a topic, we couldn't figure out which was the full name of said topic, because the uuid was not available anywhere anymore. My fix as of now involves copying the channel.uuid into its twin, for making it always accesible. Think if there is a better way to do it.

@Flood1993 Flood1993 force-pushed the finer_topic_handling branch from b0efda5 to df5e89c Compare June 20, 2017 16:00
@Flood1993 Flood1993 force-pushed the finer_topic_handling branch 2 times, most recently from d95a560 to 5b2b12b Compare June 29, 2017 10:09
@Flood1993 Flood1993 force-pushed the finer_topic_handling branch from 5b2b12b to dda74d5 Compare June 29, 2017 10:17
osbrain/agent.py Outdated
@@ -898,6 +898,44 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_socket_from_topic(self, socket, topic: bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe unsubscribe_from_topic() would be better. First parameter could be an AgentAddress, an alias...

Also, if you set the topic type (i.e.: topic: bytest), do the same for all parameters. 😉

osbrain/agent.py Outdated
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use if-else or if: return; if: return.

osbrain/agent.py Outdated
'''
if isinstance(self.address[socket], AgentAddress):
self.socket[socket].setsockopt(zmq.SUBSCRIBE, topic)
if isinstance(self.address[socket], AgentChannel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use if-else or if: return; if: return.

client.subscribe_socket_to_topic('sub', b'TOP')

# Message not received since 'TOP' topic not specified in the send call
server.send('pub', 'world')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again we find synchronization problems here.

What do you think about this? We create a function only for tests sync_pub_sub_append_received (for example). This function takes a publisher and a subscriber. Assumes the handler in the subscriber is the classic append_received (which we use most of the time). Starts publishing messages like increasing natural numbers (in a for loop, not using a timer, we can set a small sleep here). We wait for the first one to be appended in the client. Then we no longer send messages from the publisher. We wait for the client to receive the last number sent from the publisher (once the first is received, all messages after that one should also be received, and we know which number we sent last from the publisher). Then PUB-SUB are definitely connected and in sync. Now we reset received in the client and there we go!

What you think?

(This would apply for other tests we have around there, until we implement heart-beating or some other fancy stuff).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea, however I would still need to set a handler other than append_received once the connection is guaranteed to be ok. Perhaps we could do the same that we did in #151 with subscribe or some similar implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm just throwing random ideas... 😂

@Flood1993
Copy link
Member Author

Incorporated the proposed changes.

@Flood1993 Flood1993 requested a review from Peque June 29, 2017 13:53
osbrain/agent.py Outdated
@@ -898,6 +898,51 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_from_topic(self,
socket: Union[AgentAddress, str, zmq.Socket],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Union[AgentAddress, AgentChannel, str]? AgentChannel should be there, and I would remove zmq.Socket`, as the user should never be working with raw sockets in osBrain.

osbrain/agent.py Outdated
treated_topic = channel.uuid + topic
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic)

def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Union[AgentAddress, AgentChannel, str]? AgentChannel should be there, and I would remove zmq.Socket`, as the user should never be working with raw sockets in osBrain.

osbrain/agent.py Outdated
@@ -898,6 +898,51 @@ def _connect_and_register(self, client_address, alias=None, handler=None,
self.register(socket, register_as, alias, handler)
return client_address

def unsubscribe_from_topic(self,
socket: Union[AgentAddress, str, zmq.Socket],
topic: bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do allow Union[bytes, str] when making subscriptions (see Agent's _subscribe method). I think we should do the same here as well. User can directly use str (it will automatically be converted to bytes encoded in UTF8, but can use bytes if they prefer to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe this should be considered in the tests as well: test with bytes and test with string).

osbrain/agent.py Outdated
self.socket[socket].setsockopt(zmq.UNSUBSCRIBE, treated_topic)

def subscribe_to_topic(self, socket: Union[AgentAddress, str, zmq.Socket],
topic: bytes):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do allow Union[bytes, str] when making subscriptions (see Agent's _subscribe method). I think we should do the same here as well. User can directly use str (it will automatically be converted to bytes encoded in UTF8, but can use bytes if they prefer to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(maybe this should be considered in the tests as well: test with bytes and test with string).

@Peque
Copy link
Member

Peque commented Jun 29, 2017

@Flood1993 for consistency with how we currently execute the .connect() call, I think these methods should be a bit different:

  • Unsubscriptions should have the address (or alias), not socket, as first parameter. Same for subscriptions.
  • Unsubscriptions should maybe accept a list of topics as second parameters (? not critical, though).
  • Subscriptions should follow a closer API to .connect(): they should define the handler. This could be a dictionary in the form {'topic0': handler0, 'topic1': handler1} just like with .connect(). I think this is more important. Note that currently you can do: agent.connect(..., {'a': handlera, 'b': handlerb}), then, we you do agent.subscribe_to_topic(..., topic='c') (which could probably be renamed to .subscribe()), which handler should be use? a or b? I think we should force the user.
  • This last point makes me think that maybe we are looking to make _subscribe() public, and create an unsubscribe function.

What do you think? (We can talk about this tomorrow if you want)

@Flood1993
Copy link
Member Author

I think that would be great. Right now the socket parameter is not even a socket. However, I think naming it alias might be misleading, since it does not have to be an alias. However, I can't come up with a better name (address_identifier? Even though it could also be a channel...). I think more functions have this "problem".

I completely agree with accepting various handlers at the same time through a dictionary, and also with making _subscribe public.

@Peque
Copy link
Member

Peque commented Jun 29, 2017

Maybe alias is good. You can only subscribe/unsubscribe already existing addresses/channels, which means that you should already know the alias. Being a public method, it is also more intuitive and perhaps more common to use the alias as well. We should of course accept AgentAddress/Channel as well.

We could also think for a word to use everywhere when we refer to an Agent's address/channel/alias. Which in my head would be simply "address". 😕

@Flood1993
Copy link
Member Author

Closed in favor of #158

@Flood1993 Flood1993 closed this Jul 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants