Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented UDP socket utility function for Nodes #53

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open

Conversation

studywolf
Copy link

Motivation and context:

Communication between Nengo nodes locally or over a network using UDP sockets. Originally and almost entirely written by @xchoo, I added a few fixes for bugs that crop up when testing non-local communication and documentation.

How long should this take to review?

Types of changes:

Added nengo_utils/sockets.py and nengo_utils/tests/test_sockets.py

Checklist:

  • I have read the CONTRIBUTING.rst document.
  • [n/a] I have updated the documentation accordingly.
  • [n/a] I have included a changelog entry.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Default packet data type: float
Default packet structure: [t, x[0], x[1], x[2], ... , x[d]]"""

send_data = [float(t + self.dt_remote / 2.0)] + \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why self.dt_remote / 2.0 instead of just self.dt_remote?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that ... is a good question. in a receive node it checks to make sure that the packet's t >= self.t and t < self.t + self.dt, so i assume t + self.dt_remote / 2 is used in the packet to avoid rounding errors? @xchoo can you confirm?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. this is to mostly avoid rounding errors (because the socket class can be used to accept or send data to any device). It also makes sure that the timestamp of the data sent is squarely in one dt block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how this (and the other place in the code where this happens) would solve a rounding error? I'd like to make a commit to document this somewhere.

Copy link
Member

@xchoo xchoo Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's as much to address possibilities of rounding errors as it is to avoid compatibility problems in the future. The code that processes timestamps works like this:

  • When processing timestep T,
  • If socket packet is received with timestamp between T and T + dt, use it
  • If timestamp is greater than T + dt, remember it for later
  • Otherwise, ignore it.

The potential problem with this code is at the boundary points of the code (i.e. T and T + dt). The easiest way to ensure that the send code doesn't cause the receive code to run into these problems is to timestamp the packet exactly in between T and T + dt.

This also addresses the issue of rounding. Because packets can come from different devices, processing a timestep for t == 0.1s could really be processing it for t = 0.10000000000001s (floating point rounding issues on the host machine). But, if the code gets a packet with timestamp t = 0.99999999999s (floating point or casting issues with data from the remote machine), should it be considered for the previous timestep, or for the current timestep? Using a timestamp of T + dt / 2.0 avoids this ambiguity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would send the packets with the correct timestamp, and leave the checking logic to the machine that's processing the timestep. That way, it's easy to change the checking procedure. You could add dt / 2. (or some other epsilon) as part of the check, or you could round all timesteps to the nearest timestep on the machine doing the processing. I think it just helps keep the two things separate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. That could be done too. But I'd just caution that changes to this code requires extensive (beyond the unit tests) testing. There's a bunch of packet memory code (to handle cases where you receive packets timestamped in the future or past) that needs to be rechecked if changes are made to this logic.

Copy link
Member

@xchoo xchoo Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. another word of caution. Because the socket code only handles packet data, there could be instances where the host and remote systems run using different dt's, so the timestamp processing code has to handle this. The way it's currently done is t >= T and t < T + dt. but it could just as easily be done as t >= T - dt/2 and t < T + dt/2. I chose the first option because it makes it clear that any packet received from T to T + dt is considered for the T timestep. Going the second option means potentially dealing with issues of having to test receiving data from the past.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... How should we include this discussion in the documentation? Can we just in-line it or would that take too much space?

Slight refactoring and fix bug with queue (doesn't exist in
nengo.utils.compat; put in nengo_extras.utils instead).

Highlights:

- Move some instance variables to the thread
- Ensure docstrings match style guide
- Split run method into run_send and run_recv,
  which results in not having to suppress the complexity check
- Rearranged several methods for (hopefully) easier scanning
@tbekolay
Copy link
Member

Pushed a commit with some refactoring. The tests pass for me locally -- except for once when it the time_sync test failed. Likely thread related so hard to reproduce, but it would be annoying to have to restart TravisCI jobs when the test randomly fails... perhaps the time sync could be made more robust? How else can we test this @xchoo?

@xchoo
Copy link
Member

xchoo commented Sep 28, 2017

Hmmm. I'm not sure. The problem is that two independent nengo models are required to test the time_sync functionality. We might be able to test it in one model, but that might also just cause the entire model to deadlock. I suppose you can drop that test for now until we figure out a more robust solution?

Copy link
Contributor

@jgosmann jgosmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code and left comments with the thoughts I had reading. Some general comments:

  1. Does this support IPv6? Do we want to support IPv6?
  2. When a class has a close method, it is an indicator to me that it maybe deserves to be a context manager (i.e. be usable with with). Is there a reason to not make UDPSocket a context manager?
  3. To me it seems more logical to send the actual timestamp instead of t + dt / 2 and leave it to the receiver to verify it.
  4. What is the timeout logic supposed to do? And what is it supposed to be? At the moment the logic seems to be distributed across multiple places which makes it hard to grasp.
    timeout logic

For a send only socket, the user must specify:
(send_dim, dest_port)
and may optionally specify:
(dest_addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this formatting will be lost when the documentation is built with sphinx. Do we care about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstrings have been changed in the new commit; this block was explaining the oddities involved with the old organization anyhow.

machine, handling cases where simulation time steps are not the same.
local_addr : str, optional (Default: '127.0.0.1')
The local IP address data is received over.
local_port : int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't state that this is optional, but the function signature has a default value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because if you want to use local_addr this parameter is not optional. The default value specified in the constructor is meant to tell the constructor to ignore the local_addr setting.

Copy link
Member

@tbekolay tbekolay Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way around this (let me know if this seems good) is to split this into 3 processes, one for sending, one for receiving, one for doing both. We can still make sure there's no code duplication by having the step function returned by make_step be similar to the main socket class now (but a bit simpler). If this seems good I can implement it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The only reason why it's all in one function right now is because of code duplication. If 3 separate processes can be made without code duplication, that'll be ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now.

The local port data is receive over.
dest_addr : str, optional (Default: '127.0.0.1')
The local or remote IP address data is sent to.
dest_port: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again optional according the the function signature, but not according to the documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the local_port parameter.

a socket.
byte_order : str, optional (Default: '!')
Specify 'big' or 'little' endian data format.
'!' uses the system default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we state that '<' and '>' are also supported? When integrating with other Python code, one might to use those?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in the new commit.

self.send_dim = send_dim
self.recv_dim = recv_dim

self.max_recv_len = (recv_dim + 1) * 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +1? Why times 4? (4 bytes per value?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to account for the timestamp, *4 because the packet data is sent as float: struct.pack(self.byte_order + 'f' * (self.send_dim + 1), *send_data), which are 4 bytes long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should then probably use struct.calcsize to determine the byte length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suuure... okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new commit, we do not track the max_recv_len; instead, we use NumPy to hold data associated with the socket, and do socket.recv_into(ndarray.data) to stream the data from the socket directly into the numpy array.

for port in self.dest_port:
self.send_socket.sendto(
self.pack_packet(t, x), (addr, port))
self.last_packet_t = t # Copy t (which is a scalar)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment does not seem helpful. The assignment tells me that t is assigned (“copied”) to the attribute. I might not know that t is a scalar, but that depends on the caller, so I would rather document the argument list saying that t is supposed to be a scalar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh. The comment is in reference to t not being a scalar/float, and the copy is needed so that the class doesn't just have a reference to t.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is just storing the reference to t and not doing a copy? Why is t not a scalar/float?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno. haha. I can't remember why this is done tbh. 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done because previously the t and x provided to a node function was a live signal; in newer versions of Nengo it's a copy, so keeping a reference to it is now safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I think the line wasn't doing what is was intended to do ... anyways I suppose this is now obsolete.

"""Function that will be passed into the Nengo node.

When both sending and receiving, the sending frequency is
regulated by comparing the local and remote time steps. Information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the sending frequency only regulated when both sending and receiving, i.e. when only sending it is not controlled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When sending only, the sending frequency is determined by the local t. When sending and receiving, it tries to sync up the packets so neither side gets flooded with packets.

"""

# If t == 0, return array of zeros and reset state of class,
# empty queue of messages, close any open sockets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this? Because this indicates a simulator reset?
Maybe the socket functionality rather be implemented as a nengo.Process to properly support resets?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was written before nengo.Process was implemented. So... if anyone has the free time to switch the code over and test it, by all means! 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


def test_send_recv_chain(Simulator, plt, seed, rng):
# Model that sends data
udp_send = sockets.UDPSocket(dest_port=54321, send_dim=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to have Python pick free ports (I know it is possible for the TCPServer stuff in Python)? To avoid problems of tests failing if the specified port is already in use on the system by some other program.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe giving the port a value of 0 might do it? I can't quite remember.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might look into it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you pass a 0 as the port, it is up to the operating system to give you a free port, which it will do unless you've used all 64K available ports. However, it's not a great solution because in order for this to work, both processes need to know the port to connect to; there isn't an easy way to build a simulator, ask the built model what port it ended up using, and communicate that to another process before the whole thing times out. So, I think it's best to force the user to specify a port.

atol=0.0001, rtol=0.0001)


def test_time_sync(Simulator, plt, seed, rng):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is different about this test than the previous test? How/where does it test the time synchronization? (What exactly is meant by that?)

Copy link
Member

@xchoo xchoo Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_send_recv_chain tests a feedforward chain of UDP sockets:

[send] --> [recv -> send] --> [recv]

So. the data just flows from the first send node all the way to the last recv node.

test_time_sync tests bi-directional communication of sockets which requires the timing of each node to be synchronized (i.e. to timestep-lock the sockets)

[send/recv] <---> [send/recv]

I should note that these are the two most basic setups you can have with sockets, and it doesn't even test having multiple sockets in one model.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, test_bidirectional_communication or something similar might make that clearer.

Copy link
Member

@xchoo xchoo Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. because test_bidirectional_communication could be:

Model1     Model2
------     ------
[send] --> [recv]
[recv] <-- [send]

Which is different than what is being tested.

Model1           Model2
------           ------
[send/recv] <--> [recv/send]

@xchoo
Copy link
Member

xchoo commented Sep 29, 2017

Does this support IPv6? Do we want to support IPv6?

No, I don't think it supports IPv6. But since it uses the python socket class, if that supports IPv6, then so should this class (with no changes necessary)

When a class has a close method, it is an indicator to me that it maybe deserves to be a context manager (i.e. be usable with with). Is there a reason to not make UDPSocket a context manager?

Perhaps? I will leave it to someone with more expertise with context managers to decide.

To me it seems more logical to send the actual timestamp instead of t + dt / 2 and leave it to the receiver to verify it.

The timestamp send code was written to work unambiguously with the nengo receive code, and arduino receive code written with the same logic (i.e. t >= T and t < T + dt). The change was made to t + dt/2 because the original implementation of t was giving problems with rounding. If someone would like to retest the t implementation with different hardware, by all means! 😄

What is the timeout logic supposed to do? And what is it supposed to be? At the moment the logic seems to be distributed across multiple places which makes it hard to grasp.

It's complicated. There are multiple levels of timeouts happening.

  1. The initial socket creation could time out
  2. Waiting for a packet could time out
  3. Receiving a packet with non-current data, and deciding when to terminate the socket because consistent past data is being received.

And I'm sure there are other things in there as well.


while True:
try:
packet, addr = self.recv_socket.recvfrom(self.max_recv_len)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this return less then max_recv_len bytes? The Python documentation doesn't really say ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You can ask for X bytes of a packet, where X is <= to the packet data length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I'm wondering about is whether this might return y < X bytes when X bytes are requested. Which would require to handle that case because otherwise the unpacking would fail because we've got only a partial packet.

Copy link
Member

@xchoo xchoo Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But only if the send code has super screwed up the send format (i.e. the send and recv formats are different). Every packet received is a full packet. If the packet is corrupted, the python socket code should drop it (it's udp for a reason).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm essentially asking. I think with TCP connections you don't have that guarantee (but I didn't know about UDP).

Refactored into three separate processes.

Also uses numpy instead of manual struct packing for shorter code
and (hopefully) faster execution.
@tbekolay
Copy link
Member

tbekolay commented Oct 3, 2017

Pushed a commit refactoring this into three separate processes. I'll respond to the inline questions given the new code, but it's a big change so it's worth people looking at this again. In particular, I removed the ability to connect to multiple ports in the same connection (make more processes if that's desirable), and used numpy instead of struct to do byte conversions. Also, we previously did not bind the send socket, but in this version I do. The unit tests pass consistently, but without other examples it's hard to know if I've changed anything critical.

One other comment: do we actually want the ignore_timestamps argument? It seems to add little yet complicate the code a fair bit. I'd favor dropping it if there isn't a strong argument for having it.

@tbekolay
Copy link
Member

tbekolay commented Oct 3, 2017

Oh also, if anyone's curious why I added a shutdown before close, see this blog post: https://engineering.imvu.com/2014/03/06/charming-python-how-to-actually-close-a-socket-by-calling-shutdown-before-calling-close/

@jgosmann
Copy link
Contributor

jgosmann commented Oct 3, 2017

Oh also, if anyone's curious why I added a shutdown before close

👍
I remember coming across this when I refactored the Nengo GUI server

Copy link
Contributor

@jgosmann jgosmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one comment for now, didn't manage to do a complete review. The timeout stuff still confuses me.

The timestamp send code was written to work unambiguously with the nengo receive code, and arduino receive code written with the same logic (i.e. t >= T and t < T + dt). The change was made to t + dt/2 because the original implementation of t was giving problems with rounding. If someone would like to retest the t implementation with different hardware, by all means!

If I were to use this code to send data from a model to somewhere else, I would expect the timestamps to match the actual timestamp the data was recorded at. If the verification code tests T - dt / 2 <= t < T + dt / 2, rounding errors should not be a problem. I don't know about other hardware implementations (Arduino?) that rely on this and would need adjustment and cannot judge how much trouble it would be to change those.

self.timeout = timeout_min
self.stopped = False

def delay(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't seem to be called and is identical to reset_timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I forgot to remove this. Will do that now.

@tbekolay
Copy link
Member

tbekolay commented Oct 4, 2017

The timeout stuff still confuses me.

I'm a bit confused about it as well... but presumably @xchoo put it in for a reason? My current thinking on why it's necessary is that, without the timeout thread, the close methods don't get called until garbage collection. Doing it manually in a model is not easy either, as the step function returned by the process is not easy to access once the model is built. I started working on a PR for Nengo to expose that more easily, but I abandoned it because it was too complex and did not result in nice model code (which is also why doing this as a context manager does not work well, fyi). The timeout thread will at least ensure that the sockets will be shutdown and closed when the model has not been run for a while. Of course, running this in a GUI setting in which you want to pause and play a few times won't work well. But resetting the model and running again should work.

I would expect the timestamps to match the actual timestamp the data was recorded at

I don't really care that much about the timestamp, as long as it works. Feel free to change it and see if it works.

@jgosmann
Copy link
Contributor

jgosmann commented Oct 4, 2017

Spend some time looking at the timeout logic. Let's see whether I understand this correctly:

  1. The socket needs to be closed after usage, but the nengo.Process has no way of knowing when a simulation ended. Thus, the SocketCloseThread is created to shutdown the socket when there haven't been any calls to recv or send (which is equivalent to calls to the step function created by the Process) for some time.
  2. The SocketCloseThread might time out early and close the socket prematurely (either because a simulation time step took very long or the simulation was “interrupted” and continued with another call to sim.run(t). Thus, we reopen the socket in this case. But why is this only done for a receiving and not for a sending socket?
  3. If reopening a socket, it will be tried repeatedly until it succeeds with increasing wait times (“backoff”). It seems to me that with the proper shutdown using socket.shutdown before socket.close should be much less of a problem. Maybe that code isn't required anymore?
  4. A timeout is set on the blocking operations of the socket. This timeout is not handled when sending data (why?). When receiving data, it will be tried to receive data again. So it seems that the timeout for the socket could be made infinite instead, except that the SocketCloseThread timeout needs to be reset to prevent the socket from being closed. However, the socket timeout is set to the max timeout of SocketCloseThread, so it is likely that the socket gets closed anyways and needs to be reopened.

Assuming my understanding is correct, it feels to me that there should be (might be?) a better solution. But as a first step, it would probably best to agree how long the socket should be alive in an ideal world (and then see if that can be implemented):

  1. The current implementation tries to have the socket alive for the duration the sim.run method is executed.
  2. But maybe the life of the socket should be tied to the simulator (opened when building the model, closing with sim.close)? Of course currently there is no way for the nengo.Process to know about the close and implementing this might be complicated as @tbekolay says. Though, something similar (in a more general fashion) has been requested before (Possibility to register on-close actions in builder functions. nengo#1118).
  3. Maybe the user itself should manage the lifecycle of the socket? This seems slightly problematic as there is no way to properly close the socket in the Nengo GUI and even in a Python script it could mean wrapping most of it in a with statement which doesn't seem nice either.

@tbekolay
Copy link
Member

tbekolay commented Oct 4, 2017

But why is this only done for a receiving and not for a sending socket?

That's how it was originally implemented, so I maintained that same logic. Perhaps it should happen for both? Originally, also, the sending socket was not bound to a particular address and port, in part because it used to be possible to send to multiple ports (which didn't really make much sense). I removed that, which meant that I bound the socket once opened, which perhaps also necessitates doing the reopening? On the other hand, sending to a socket should never time out because in the worst case you're filling up some remote buffer, whereas receiving can easily time out if the process you're receiving from is slower than you, or blocks for some other reason.

Maybe that [backoff] code isn't required anymore?

Retrying with increasing backoff is a pretty standard technique in networking, so I would be hesitant to remove it (see, e.g., http://docs.aws.amazon.com/general/latest/gr/api-retries.html).

However, the socket timeout is set to the max timeout of SocketCloseThread, so it is likely that the socket gets closed anyways and needs to be reopened.

Again, I was attempting to keep the same logic as was there before. @xchoo or @studywolf will have to speak to why this happens.

how long the socket should be alive in an ideal world

I don't think there's a right answer here, different use cases will have different ideals. Leaving things in the hands of the user might work, but has similar technical difficulties as registering onclose events -- namely, the step function returned by make_step is not stored by the builder, and it's non-trivial to have it be stored and accessible.

Copy link
Contributor

@jgosmann jgosmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to reject this PR for now. Mostly because there are too many open questions about what the timeout logic is supposed to do and what it is actually doing. These things need to be clarified first and at the minimum will require improvements to the documentation of the code.

There are also a few other changes that I like to propose for which I intend to prepare commits.

@tbekolay
Copy link
Member

tbekolay commented Oct 4, 2017

The nengo_extras repo is designed for experimental (not necessarily fully baked) features. This PR has tests that are passing, which is better than 90% of software out there, so I'm not sure that we should block this, given that there are people (namely @studywolf) who want to use this in their models right now. We can always make a PR to modify this later. There are many things in this repository that haven't gotten the scrutiny that this has, so I'm not sure why this is being blocked when those have been merged without discussion?

@xchoo
Copy link
Member

xchoo commented Oct 4, 2017

Just replying to some of @jgosmann comments.

The socket needs to be closed after usage, but the nengo.Process has no way of knowing when a simulation ended. Thus, the SocketCloseThread is created to shutdown the socket when there haven't been any calls to recv or send (which is equivalent to calls to the step function created by the Process) for some time.

Yes. Almost. The sending sockets only need to know when the local simulator has terminated. However, the receiving sockets need to know when the remote simulator has terminated. More on this later.

The SocketCloseThread might time out early and close the socket prematurely (either because a simulation time step took very long or the simulation was “interrupted” and continued with another call to sim.run(t). Thus, we reopen the socket in this case. But why is this only done for a receiving and not for a sending socket?

The reason why sending sockets don't need to be reopened is because they know when the data stream has ended (because the local simulator generates the data). However, receive sockets are blocking (i.e., they "hang" the socket thread until a packet is received). This being the case, some kind of logic has to be implemented to "figure out" when the remote side has stopped sending data. It is possible to have specialized "TERM" data packets, but, being UDP, there is no guarantee that any packets get to the destination. Then we'd have to do some kind of ACK/SYN logic which is a whole other can of worms (it'll be like implementing TCP in UDP)

If reopening a socket, it will be tried repeatedly until it succeeds with increasing wait times (“backoff”). It seems to me that with the proper shutdown using socket.shutdown before socket.close should be much less of a problem. Maybe that code isn't required anymore?

Iirc, the backoff logic was a "try X number of times" to make sure that the remote end is really terminated. The code is still required because the receiving sockets have no information about what the remote side is doing.

A timeout is set on the blocking operations of the socket. This timeout is not handled when sending data (why?). When receiving data, it will be tried to receive data again. So it seems that the timeout for the socket could be made infinite instead, except that the SocketCloseThread timeout needs to be reset to prevent the socket from being closed. However, the socket timeout is set to the max timeout of SocketCloseThread, so it is likely that the socket gets closed anyways and needs to be reopened.

The timeout is necessary on receiving sockets because they are blocking sockets. If no timeout is implemented, they block indefinitely (which is not ideal). Send sockets dont have timeouts because their primary function is to fire off a packet when it gets the information to.

@xchoo
Copy link
Member

xchoo commented Oct 4, 2017

Imho, the PR seems to be a good rework of the socket code, but with all code that interacts with hardware, i'd like to see it tested in a "live" environment before giving it the thumbs up. 😄
Tagging @studywolf because he's currently using the code in such an environment (and I'm currently busy with stuff to test it out for myself).

@jgosmann
Copy link
Contributor

jgosmann commented Oct 4, 2017

We can always make a PR to modify this later.

And it might never happen given effort to understand the intent of the code in the first place and even then it isn't entirly clear without input of the original authors. Considering the amount of time I spend on this now, it is much more efficient to make those things clear in a timely manner than sometime later, when all of us have forgotton again how the parts of the code interact.

I am probably fine with keeping the timeout thread (once intended logic is made clear!) as there seems to be no other way in current Nengo releases to close the socket (and it might take some discussion to implement such a way).

@xchoo
Copy link
Member

xchoo commented Oct 4, 2017

I am probably fine with keeping the timeout thread (once intended logic is made clear!) as there seems to be no other way in current Nengo releases to close the socket (and it might take some discussion to implement such a way).

Even if there was a way for nengo to close sockets, some kind of timeout logic is required because receiving sockets have no information about the state of the remote side. And it needs this information in order to tell the local simulator when or when not to shut itself down.

@jgosmann
Copy link
Contributor

jgosmann commented Oct 4, 2017

@xchoo first of all thank you for taking the time trying to answer my questions.

I now realized that there isn't a timeout on sending sockets (this is quite easy to miss, not sure if that can be made more obvious). But that leaves me with the question, how is the sending socket ever closed? This seems to only happen when the SocketStep gets garbage collected (due to Python's reference counting this should happen when the Simulator object is deleted). That means the sending socket is probably kept around longer than it is needed, but it doesn't seem like a huge problem to me.

Iirc, the backoff logic was a "try X number of times" to make sure that the remote end is really terminated. The code is still required because the receiving sockets have no information about what the remote side is doing.

I am not sure how that would tell you anything about the remote side. Whether the receiving socket can be opened should only depend on the local system, shouldn't it?

The timeout is necessary on receiving sockets because they are blocking sockets. If no timeout is implemented, they block indefinitely (which is not ideal).

Would the recv call return if the socket is closed from another thread?

@xchoo
Copy link
Member

xchoo commented Oct 4, 2017

how is the sending socket ever closed?

In the original code, the sending sockets are closed when the simulator is terminated (garbage collected i think), or when the close method is manually called. There are instances where this fails, which is why in the original code, when the open method is called, it attempts to close the original socket.

I am not sure how that would tell you anything about the remote side. Whether the receiving socket can be opened should only depend on the local system, shouldn't it?

Yes. Whether it can be opened should be dependent on the local system. Whether it should be reopened is dependent on the remote system.

Would the recv call return if the socket is closed from another thread?

I think so, but it doesn't really matter if it returns or not. If the terminate call is made, the socket will terminate.

@tbekolay
Copy link
Member

tbekolay commented Oct 4, 2017

@xchoo first of all thank you for taking the time trying to answer my questions.

Cool no problem dude. Seriously the refactoring took a lot of time and effort and I think it's a lot more understandable than before.

@jgosmann
Copy link
Contributor

I made some more commits documenting the timestamp checking logic and putting the adaptive socket timeout back in. In the following I will give an overview over the general changes that I have made (in comparison to @tbekolay's last commit 61de86d).

  1. Added some comments on things that could be improved in the future (but are not super pressing now).
  2. Removed the socket close thread using SO_REUSEADDR/SO_REUSEPORT instead (details below)
  3. Adaptive timeout (details below)
  4. Simplified timestamp checking with the intent to keep the same logic as written by @xchoo except for sending values with t instead of t + dt/2 (I think @tbekolay already removed the +dt/2, but did not adjust the timestamp checking accordingly? Or I missed it.) (details below)
  5. Fixed the tests to actually show exceptions that occur in the simulation threads.
  6. A bunch of documentation improvements, some renaming of parameters. Main noteworthy change here: The address and port are now passed in as a tuple which is more in line with the Python standard library, shortens the argument lists (and code) a bit and these two values are always used together. I prefer it that way, but I'm also fine with reverting that (or any other name changes).

Removal of the socket close thread

The socket close thread was the main problem I had with the code in its prior state. Because it can randomly close the socket, there is a lot of complexity in handling that and interactions with the regular timeout that are hard to follow and understand. Without the thread there is no need to expect a premature closure of the socket when waiting for packets and we also do not need to reopen the socket which also allows to remove the backoff logic.

The current solution is to use the SO_REUSEPORT/SO_REUSEADDR options. From what I read this should work on all major OS, but still needs testing on Windows. (Interesting side note: Linux >= 3.9 does load balancing with SO_REUSEPORT which is a cool feature, but not what we want. But SO_REUSEADDR works on Linux.). I added a unit test to test the reopening of sockets (and that data goes to the correct socket).

This solution is what I prefer because it is less code, but I could see a variant of the socket close thread that I would be ok with. Essentially, the socket close thread would need to be prevented from closing the socket while the main thread is in the recv method. That could be easily done with a mutex that gets acquired before recv and needs to be acquired by the socket close thread before closing the socket. Before recv releases the mutex it resets the thread timeout. That way the recv method does not need to deal with premature socket closures which keeps it code simpler and gets rid of the complexity of the inteaction of the socket timeout and close timeout. Note, that there will still be some code required to reopen a closed socket (it could get closed between two run calls).

Adaptive timeout

I don't have a complete understanding of how this worked in the original code (it used to be tied to the socket close thread, but apparently wasn't about socket closes?) Anyways, the recv timeout get adapted with the same formulas used for the idle_timeout. The logic is implemented in the _UDPSocket class (it seems to me that it ties more closely to the socket then the step method). Timeout is reduced during successful reads and set back to maximum on a missed packet. I wonder whether it should actually slowly increase the timeout instead?

Timestamp checking

I added several paragraphs explaining how differing local and remote dt are aligned in the SocketStep docstring. Not sure if that is the best place ...

The logic for the timestamp checking should be the same to previous versions, except that the boundaries are shifted by dt/2 because that is not added to the send timestamp anymore. Thus, any existing (hardware) implementation should be easy to adapt by just adding dt/2 to the incoming timestamp. I think, that sending the actual timestep is less surprising for people using this to send out data to some of their own applications and use it there.

As for the simplifications of the implementation of the logic in the SocketStep.recv method:

  1. I removed the try-except, because there cannot be a premature socket closure anymore and the timeout is handled in the calling method.
  2. According to the documentation ignore_timestap should not do any timestamp checking whatsoever, so I moved this to the beginning of the method as a separate case. That makes the if-conditions later a bit simpler. (You probably only want use low timeouts with ignore_timestamp to not stall the simulation for to long, but just update the local value whenever new data comes in.)
  3. The while True loop consisted only of an if condition doing a break besides the socket recv (after removing the try-except). So that if condition might as well be moved the while condition.
  4. The final if-condition stayed the same, except that the left part of the inequality was already covered by the while loop.
  5. There is one more block in the middle of the function, that handles receiving the first package, which has a separate timeout and we do not have a value for t yet (which would break the conditions in the while and if).
  6. Finally SocketStep.__call__ keeps track of the number of timeouts (reset on successful recv) and once a limit was exceeded just reuses the old value. This was not in the original code, but @xchoo said at some point in the discussion that something like this should be there.

I added a bunch of small tests for the timestamp checking that also give examples of what values are used in different scenarios.

@jgosmann
Copy link
Contributor

Made some fixes and added some debug logging. Also run a test communicating between ctngpu1 and my office computer (ctn18) where on my end I used Nengo GUI; code for further tests is attached below. Everything seemed to work fine, some things I tried out:

  • Starting simulations on both computers.
  • Pausing simulation in the GUI (while remote end keeps running).
  • Rebuilding simulation in the GUI (reopens socket, resets time, while remote end keeps running).
  • Restarting remote end.

In each case the simulations would succefully sync up again (in some instance it might depend on simulation speed and recv timeout, but I didn't experience any problems in that regard).

@jgosmann
Copy link
Contributor

Forgot the test code:

test_local.py:

import nengo
from nengo_extras.sockets import UDPSendReceiveSocket


with nengo.Network() as model:
    sock = UDPSendReceiveSocket(
        listen_addr=('0.0.0.0', 31338),
        remote_addr=('ctngpu1', 31337),
        recv_timeout=0.3)
    sock_node = nengo.Node(sock, size_in=2, size_out=1)
    in_node = nengo.Node([0., 0.])
    out_node = nengo.Node(size_in=1)
    nengo.Connection(in_node, sock_node)
    nengo.Connection(sock_node, out_node)


if __name__ == '__main__':
    nengo.log('debug')
    with nengo.Simulator(model) as sim:
        sim.run(300.)

test_remote.py:

import nengo
from nengo_extras.sockets import UDPSendReceiveSocket
import numpy as np


with nengo.Network() as model:
    sock = UDPSendReceiveSocket(
        listen_addr=('0.0.0.0', 31337),
        remote_addr=('ctn18', 31338),
        recv_timeout=0.3)
    sock_node = nengo.Node(sock, size_in=1, size_out=2)
    prod = nengo.Node(lambda t, x: np.prod(x), size_in=2, size_out=1)
    nengo.Connection(sock_node, prod)
    nengo.Connection(prod, sock_node)


if __name__ == '__main__':
    nengo.log('debug')
    with nengo.Simulator(model) as sim:
        sim.run(300.)

@studywolf
Copy link
Author

hmm, i'm not getting communication between the two machines i'm using... they're able to ping each other at the ip addresses i've entered, and the scripts when i run it locally, but they're just hanging on the different machines.

test_remote outputting a whole slew of:

[                    Simulating... 0%                    ] ETA: 1 day, 2:05:34[DEBUG] Send packet for t=0.904000s.
[DEBUG] Waiting for packet with adaptive timeout (current value 0.300000s.)
[INFO] No packet received for t=0.904000s.

and test local just one output:

[DEBUG] Send packet for t=0.001000s.
[DEBUG] Waiting for packet with timeout 300.000000s.

All I'm doing is changing the remote_addr IP address, is there anything else I should be doing?

@jgosmann
Copy link
Contributor

Did you make sure that neither the firewall nor the network you are on is discarding the UDP packets to your local machine? (The debug output indicates that the local machine is not receiving any packets at all.)
Did the exact same configuration work with an earlier version of the socket code (e.g., the one you opened this PR with)?

@studywolf
Copy link
Author

ahhh gd firewall, that did it! switched ports over to the one i had approved (forgot about that step) and things are running between the two now, thanks!

@jgosmann
Copy link
Contributor

@studywolf any chance you are using Windows? (i.e. does someone still have to test the code on Windows or did you do?)

@studywolf
Copy link
Author

nah i'm on ubuntu, don't have a copy of windows :( ... :)

@jgosmann
Copy link
Contributor

Tested this in a Windows 7 VirtualBox now and it worked.

@studywolf
Copy link
Author

Done some start / stop testing etc, seems pretty robost! Only weird thing is that there's like a 20-30 second delay before the communication and simulation in the GUI starts again when you restart the client side

@jgosmann
Copy link
Contributor

Only weird thing is that there's like a 20-30 second delay before the communication and simulation in the GUI starts again when you restart the client side

Yeah, that might seem weird, but I think it is actually correct behaviour (as long as you do not set ignore_timestamp=True). One simulation will keep running, so the other simulation has to catch up after being paused (or reset) until the timestamps match again. How long this will take, depends on how long the other simulation continued to run for, what timeout values have been chosen and how long it takes to simulate each timestep.

@studywolf
Copy link
Author

ahh makes sense, maybe there could be a printout or some such added on the side that's ahead letting the user know? the first couple times it happened i thought that things had frozen, only found out that it was working because i just left it running while checking my mail haha

@studywolf
Copy link
Author

or, wait. when i reset the client side and run it for 1 second, the host side is already at t=1 ... if my client side is only ever running with t<=1 how would it catch up?

@jgosmann
Copy link
Contributor

jgosmann commented Nov 1, 2017

The host side waits recv_timeout seconds in each timestep for a packet that is not in the past. If the client takes less time to run a single simulation step, it will eventually catch up. If it takes longer, it won't catch up (but in that case the communication would break down even without pausing or resetting the client).

@studywolf
Copy link
Author

Sorry I phrased poorly.

When I start the host and run it with a client for 1 second, then for the host t=1.
When I restart the client and run it for 1 second, the t for the client is between 0 and 1, so if the host is waiting for a packet that is not in the past, that will never arrive (because I'm restarting the client after 1 second of simulation).
So ... does the t used for judging in the past or not get set when a new simulation connects? I'm just confused how the client simulation time t would always be below 1, but would still simulate forward / talk with the host when the host simulation time t = 1.

@jgosmann
Copy link
Contributor

jgosmann commented Nov 1, 2017

I the case you are describing, the host should wait for recv_timeout in each timestep for a packet that is not timestamped in the past. But because it doesn't receive such a packet, it will just use the last received packet and assume that it provides the best estimate of the actual value. (There is no accounting for out of order packages etc.)

@studywolf
Copy link
Author

Cool. So I've tested this a fair bit now, and it's working quite well.
The only thing that caught me is that the host has to be started before the remote connection or it doesn't connect, and it would be nice to have a printout somewhere on the host side saying that it's waiting for the remote end to catch up (which I can play around with implementing unless you have specific thoughts on it)

@jgosmann
Copy link
Contributor

The only thing that caught me is that the host has to be started before the remote connection or it doesn't connect

That is surprising to me. Is this because the choice of specific timeouts so that one end does not catch up to the other? (btw: Are you doing bidirectional or unidirectional communication?) Or is this inherent in how the connection is setup?

printout somewhere on the host side saying that it's waiting for the remote end to catch up

I'd suggest you play around with that because I am not 100% clear on what condition this should be printed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

6 participants