-
Notifications
You must be signed in to change notification settings - Fork 551
/
time_sync.py
573 lines (452 loc) · 22.5 KB
/
time_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""A client for the time-sync service.
The time-sync service helps track the difference between the robot's system clock and the
system clock of clients, and sends an estimate of this difference to the client. The client
uses this information when it needs to send a timestamp to the robot in a request proto.
Timestamps in request protos generally need to be specified relative to the robot's system clock.
"""
import time
from threading import Event, Lock, Thread
from google.protobuf import duration_pb2
from bosdyn.api import time_sync_pb2, time_sync_service_pb2_grpc
from bosdyn.api.time_range_pb2 import TimeRange
from bosdyn.client.robot_command import NoTimeSyncError, _TimeConverter
from bosdyn.util import (RobotTimeConverter, now_nsec, now_sec, nsec_to_timestamp, parse_timespan,
set_timestamp_from_nsec, timestamp_to_nsec)
from .common import BaseClient, common_header_errors
from .exceptions import Error
class TimeSyncError(Error):
"""General class of errors for TimeSync non-response / non-grpc errors."""
class NotEstablishedError(TimeSyncError):
"""Client has not established time-sync with the robot."""
class TimedOutError(TimeSyncError):
"""Exceeded deadline to achieve time-sync."""
class InactiveThreadError(TimeSyncError):
"""Time-sync thread is no longer running."""
class TimeSyncClient(BaseClient):
"""A client for establishing time-sync with a server/robot."""
default_service_name = 'time-sync'
service_type = 'bosdyn.api.TimeSyncService'
def __init__(self):
super(TimeSyncClient, self).__init__(time_sync_service_pb2_grpc.TimeSyncServiceStub)
def get_time_sync_update(self, previous_round_trip, clock_identifier, **kwargs):
"""Obtain an initial or updated timesync estimate with server.
Args:
previous_round_trip (bosdyn.api.TimeSyncRoundTrip): None on first rpc call, then
fill out with previous response
from server.
clock_identifier (string): Empty on first call, assigned by server in first response.
Raises:
RpcError: Problem communicating with the robot.
"""
req = self._get_time_sync_update_request(previous_round_trip, clock_identifier)
return self.call(self._stub.TimeSyncUpdate, req, None, common_header_errors,
copy_request=False, **kwargs)
def get_time_sync_update_async(self, previous_round_trip, clock_identifier, **kwargs):
"""Async version of get_time_sync_update()"""
req = self._get_time_sync_update_request(previous_round_trip, clock_identifier)
return self.call_async(self._stub.TimeSyncUpdate, req, None, common_header_errors,
copy_request=False, **kwargs)
@staticmethod
def _get_time_sync_update_request(previous_round_trip, clock_identifier):
return time_sync_pb2.TimeSyncUpdateRequest(previous_round_trip=previous_round_trip,
clock_identifier=clock_identifier)
def _get_time_sync_status_value(response):
return response.time_sync_status_map
def robot_time_range_from_nanoseconds(start_nsec, end_nsec, time_sync_endpoint=None):
"""Generate timespan as a TimeRange proto, in robot time.
If time_sync_endpoint is a TimeSyncEndpoint, the time_spec is in the local clock and will
be converted to robot_time.
If the input times are already in the robot clock, do not specify time_sync_endpoint and
the times will not be converted.
Args:
start_nsec nanoseconds since the Unix epoch or None
end_nsec nanoseconds since the Unix epoch or None
time_sync_endpoint Either TimeSyncEndpoint or None.
Returns:
return bosdyn.api.TimeRange time range in robot time
"""
time_range = TimeRange()
converter = time_sync_endpoint.get_robot_time_converter() if time_sync_endpoint else None
def _convert_nsec(nsec):
timestamp_proto = nsec_to_timestamp(int(nsec))
if not time_sync_endpoint:
return timestamp_proto
return converter.robot_timestamp_from_local(timestamp_proto)
# pylint: disable=no-member
if start_nsec:
time_range.start.CopyFrom(_convert_nsec(start_nsec))
if end_nsec:
time_range.end.CopyFrom(_convert_nsec(end_nsec))
return time_range
def robot_time_range_from_datetimes(start_datetime, end_datetime, time_sync_endpoint=None):
"""Generate timespan as a TimeRange proto, in robot time.
If time_sync_endpoint is a TimeSyncEndpoint, the time_spec is in the local clock and will
be converted to robot_time.
If the input times are already in the robot clock, do not specify time_sync_endpoint and
the times will not be converted.
Args:
start_datetime: timestamp.timestamp or None
end_datetime: timestamp.timestamp or None
time_sync_endpoint: Either TimeSyncEndpoint or None.
Returns:
return bosdyn.api.TimeRange time range in robot time
"""
def _datetime_to_nsec(date_time):
if date_time:
return date_time.timestamp() * 1e9
return None
return robot_time_range_from_nanoseconds(_datetime_to_nsec(start_datetime),
_datetime_to_nsec(end_datetime), time_sync_endpoint)
def timespec_to_robot_timespan(timespan_spec, time_sync_endpoint=None):
"""Generate timespan as TimeRange proto, in robot time.
If time_sync_endpoint is a TimeSyncEndpoint, the time_spec is in the local clock and will
be converted to robot_time.
If the input times are already in the robot clock, do not specify time_sync_endpoint and
the times will not be converted.
Args:
timespan_spec '{val}-{val}' or '{val}' time spec string
time_sync_endpoint Either TimeSyncEndpoint or None.
Returns:
return bosdyn.api.TimeRange time range in robot time
"""
start_datetime, end_datetime = parse_timespan(timespan_spec)
return robot_time_range_from_datetimes(start_datetime, end_datetime, time_sync_endpoint)
def update_time_filter(client, timestamp, timesync_endpoint):
"""Set or convert fields of the proto that need timestamps in the robot's clock.
Args:
timestamp (float): Client time, such as from time.time().
timesync_endpoint (TimeSyncEndpoint): A timesync endpoint associated with the robot object.
Raises:
bosdyn.client.robot_command.NoTimeSyncError: Could not find the timesync endpoint for the robot to convert the time.
"""
# Input timestamp is a float. (from time.time())
if not timesync_endpoint:
raise NoTimeSyncError("[world object service] No timesync endpoint set for the robot.")
# Lazy RobotTimeConverter: initialized only if needed to make a conversion.
converter = _TimeConverter(client, timesync_endpoint)
return converter.robot_timestamp_from_local_secs(timestamp)
def update_timestamp_filter(client, timestamp, timesync_endpoint):
"""Set or convert fields of the proto that need timestamps in the robot's clock.
Args:
timestamp (google.protobuf.Timestamp): Client time.
timesync_endpoint (TimeSyncEndpoint): A timesync endpoint associated with the robot object.
Raises:
bosdyn.client.robot_command.NoTimeSyncError: Could not find the timesync endpoint for the robot to convert the time.
"""
# Input timestamp is a google.protobuf.Timestamp
if not timesync_endpoint:
raise NoTimeSyncError("[world object service] No timesync endpoint set for the robot.")
converter = _TimeConverter(client, timesync_endpoint)
converter.convert_timestamp_from_local_to_robot(timestamp)
return timestamp
class TimeSyncEndpoint:
"""A wrapper that uses a TimeSyncClient object to establish and maintain timesync with a robot.
This class manages internal state, including a clock identifier and previous best time sync
estimates. This class automatically builds requests passed to the TimeSyncClient, so users
don't have to worry about the details of establishing and maintaining timesync.
This object is thread-safe.
"""
def __init__(self, time_sync_client):
self._client = time_sync_client
self._lock = Lock()
# Access these using the lock.
# These should be updated by replacement, not mutation so that they may be used
# outside the lock after being accessed via the lock.
self._locked_previous_round_trip = None
self._locked_previous_response = None
self._locked_clock_identifier = ""
@property
def response(self):
"""The last response message from the time-sync service.
Returns:
The bosdyn.api.TimeSyncResponse proto last returned by the server, or None if unset.
"""
with self._lock:
return self._locked_previous_response
@property
def has_established_time_sync(self):
"""Checks if the client has successfully established time-sync with the robot.
Returns:
Boolean true if the previous time-sync update returned that time sync is OK.
"""
response = self.response
# pylint: disable=no-member
return response and response.state.status == time_sync_pb2.TimeSyncState.STATUS_OK
@property
def round_trip_time(self):
"""The previous round trip time.
Returns:
Round trip time as google.protobuf.Duration proto if available, otherwise None.
"""
response = self.response
if response is None:
return None
return response.state.best_estimate.round_trip_time
@property
def clock_identifier(self):
"""The clock identifier for the instance of the time-sync client.
Returns:
A unique identifier for this client. Empty if get_new_estimate has not been called.
"""
with self._lock:
return self._locked_clock_identifier
@property
def clock_skew(self):
"""The best current estimate of clock skew from the time-sync service.
Returns:
The google.protobuf.Duration representing the clock skew.
Raises:
NotEstablishedError: Time sync has not yet been established.
"""
response = self.response
# pylint: disable=no-member
if not response or response.state.status != time_sync_pb2.TimeSyncState.STATUS_OK:
raise NotEstablishedError
return response.state.best_estimate.clock_skew
def establish_timesync(self, max_samples=25, break_on_success=False):
"""Perform time-synchronization until time sync established.
Args:
max_samples (int): The maximum number of times to attempt to establish time-sync
through time-synchronization.
break_on_success (bool): If true, stop performing the time-synchronization after
time-sync is established.
Return:
Boolean true if valid timesync has been established.
"""
counter = 0
while counter < max_samples:
if break_on_success and self.has_established_time_sync:
return True
self.get_new_estimate()
counter += 1
return self.has_established_time_sync
def _get_update(self):
round_trip = None
clock_identifier = None
with self._lock:
# Only add a round trip to the request along with a clock identifier, otherwise
# the sever will respond with an invalid request error.
# Responses with errors may not contain a clock identifier.
# This may happen, for example, if the service was not yet ready at the time of
# the request.
if self._locked_clock_identifier:
round_trip = self._locked_previous_round_trip
clock_identifier = self._locked_clock_identifier
return self._client.get_time_sync_update(previous_round_trip=round_trip,
clock_identifier=clock_identifier)
def get_new_estimate(self):
"""Perform an update-cycle toward achieving time-synchronization.
Return:
Boolean true if valid timesync has been established.
"""
response = self._get_update()
rx_time = now_nsec()
# Record the timing information for this GRPC call to pass to the next update
round_trip = time_sync_pb2.TimeSyncRoundTrip()
# pylint: disable=no-member
round_trip.client_tx.CopyFrom(response.header.request_header.request_timestamp)
round_trip.server_rx.CopyFrom(response.header.request_received_timestamp)
round_trip.server_tx.CopyFrom(response.header.response_timestamp)
set_timestamp_from_nsec(round_trip.client_rx, rx_time)
with self._lock:
self._locked_previous_round_trip = round_trip
# Store the response to get clock-skew estimate, etc.
self._locked_previous_response = response
self._locked_clock_identifier = response.clock_identifier
return self.has_established_time_sync
def get_robot_time_converter(self):
"""Get a RobotTimeConverter for current estimate for robot clock skew from local time.
Returns:
An instance of RobotTimeConvertor for the time-sync client.
Raises:
NotEstablishedError: If time sync has not yet been established.
"""
return RobotTimeConverter(timestamp_to_nsec(self.clock_skew))
def robot_timestamp_from_local_secs(self, local_time_secs):
"""Convert a local time in seconds to a timestamp proto in robot time.
Args:
local_time_secs (float): Timestamp in seconds since the unix epoch (e.g.,
from time.time()).
Returns:
google.protobuf.Timestamp representing local_time_secs in robot clock, or None if
local_time_secs is None.
Raises:
NotEstablishedError: Time sync has not yet been established.
"""
if not local_time_secs:
return None
converter = self.get_robot_time_converter()
return converter.robot_timestamp_from_local_secs(local_time_secs)
class TimeSyncThread:
"""Background thread for achieving and maintaining time-sync to the robot."""
# After achieving time sync, update estimate every minute.
DEFAULT_TIME_SYNC_INTERVAL_SEC = 60
# When time-sync service is not yet ready, poll it at this interval
TIME_SYNC_SERVICE_NOT_READY_INTERVAL_SEC = 5
def __init__(self, time_sync_client, time_sync_endpoint=None):
self._time_sync_endpoint = time_sync_endpoint or TimeSyncEndpoint(time_sync_client)
self._lock = Lock()
self._locked_time_sync_interval_sec = self.DEFAULT_TIME_SYNC_INTERVAL_SEC
self._locked_should_exit = False # Used to tell the thread to stop running.
self._locked_thread_exception = None # Stores any exception which ends the thread.
self._event = Event() # Used to wait for next time sync, or until thread should exit.
self._thread = None
def __del__(self):
# Stop the thread when this object is deleted.
self.stop()
def start(self):
"""Start the thread."""
with self._lock:
if self._thread and self._thread.is_alive():
return
self._locked_should_exit = False
self._locked_thread_exception = None
self._event.clear()
self._thread = Thread(target=self._timesync_thread)
self._thread.daemon = True
self._thread.start()
def stop(self):
"""Shut down the thread if it is running."""
if self._thread:
with self._lock:
self._locked_should_exit = True # Signal the thread to exit.
self._event.set() # Stop the thread's wait for the next time-sync update.
self._thread.join() # Join the thread after it exits.
self._thread = None
@property
def time_sync_interval_sec(self):
"""Returns interval at which time-sync is updated in the thread."""
with self._lock:
return self._locked_time_sync_interval_sec
@time_sync_interval_sec.setter
def time_sync_interval_sec(self, val):
"""Set interval at which time-sync is updated in the thread after sync is established.
Args:
val (float): The interval (in seconds) that the time-sync estimate should be updated.
"""
with self._lock:
self._locked_time_sync_interval_sec = val
self._event.set()
@property
def should_exit(self):
"""Returns True if thread should stop iterating."""
with self._lock:
return self._locked_should_exit
def wait_for_sync(self, timeout_sec=3.0):
"""Wait for up to the given timeout for time-sync to be achieved
Args:
timeout_sec (float): Maximum time (seconds) to wait for time-sync to be achieved.
Raises:
InactiveThreadError: Thread is not running.
time_sync.TimedOutError: Deadline to achieve time-sync is exceeded.
Threading Exceptions: Errors from threading the processes.
"""
if self.has_established_time_sync:
return
end_time_sec = now_sec() + timeout_sec
while not self.stopped:
if self.endpoint.has_established_time_sync:
return
if now_sec() > end_time_sec:
raise TimedOutError
time.sleep(0.1)
thread_exc = self.thread_exception
if thread_exc:
raise thread_exc
raise InactiveThreadError
@property
def has_established_time_sync(self):
"""Checks if the client has successfully established time-sync with the robot.
Returns:
Boolean true if the previous time-sync update returned that time sync is OK.
"""
return self.endpoint.has_established_time_sync
@property
def stopped(self):
"""Returns True if thread is no longer running."""
with self._lock:
return not self._thread or not self._thread.is_alive()
@property
def thread_exception(self):
"""Return any exception which ended the time-sync thread."""
with self._lock:
return self._locked_thread_exception
@property
def endpoint(self):
"""Return the TimeSyncEndpoint used by this thread."""
return self._time_sync_endpoint
def get_robot_clock_skew(self, timesync_timeout_sec=0):
"""Get current estimate for robot clock skew from local time.
Args:
timesync_timeout_sec (float): Time to wait for timesync before doing conversion.
Returns:
Clock skew as a google.protobuf.Duration object
Raises:
InactiveThreadError: Time-sync thread exits before time-sync.
time_sync.TimedOutError: Deadline to achieve time-sync is exceeded.
Threading Exceptions: Errors from threading the processes.
"""
self.wait_for_sync(timeout_sec=timesync_timeout_sec)
return self.endpoint.clock_skew
def get_robot_time_converter(self, timesync_timeout_sec=0):
"""Get a RobotTimeConverter for current estimate for robot clock skew from local time.
Args:
timesync_timeout_sec (float): Time to wait for timesync before doing conversion.
Raises:
InactiveThreadError: Time-sync thread exits before time-sync.
time_sync.TimedOutError: Deadline to achieve time-sync is exceeded.
Threading Exceptions: Errors from threading the processes.
"""
self.wait_for_sync(timeout_sec=timesync_timeout_sec)
return self.endpoint.get_robot_time_converter()
def robot_timestamp_from_local_secs(self, local_time_secs, timesync_timeout_sec=0):
"""Convert a local time in seconds to a timestamp proto in robot time.
Args:
local_time_secs (float): Timestamp in seconds since the unix epoch (e.g.,
from time.time()).
timesync_timeout_sec (float): Time to wait for timesync before doing conversion.
Returns:
google.protobuf.Timestamp representing local_time_secs in robot clock, or None if
local_time_secs is None.
Raises:
InactiveThreadError: Time-sync thread exits before time-sync.
time_sync.TimedOutError: Deadline to achieve time-sync is exceeded.
Threading Exceptions: Errors from threading the processes.
"""
if not local_time_secs:
return None
converter = self.get_robot_time_converter(timesync_timeout_sec)
return converter.robot_timestamp_from_local_secs(local_time_secs)
def _timesync_thread(self):
"""Background thread which communicates with the time-sync service on robot.
The purpose of this thread is to achieve and maintain time-sync, which is an estimate
of the difference between the robot's and client's system clocks.
"""
try:
while not self.should_exit:
response = self._time_sync_endpoint.response
# pylint: disable=no-member
if (not response or response.state.status
== time_sync_pb2.TimeSyncState.STATUS_MORE_SAMPLES_NEEDED):
# No wait between updates while time-sync is not established.
pass
elif response.state.status == time_sync_pb2.TimeSyncState.STATUS_SERVICE_NOT_READY:
# Wait a few seconds between updates while waiting for time-sync service
# to be ready.
self._event.wait(self.TIME_SYNC_SERVICE_NOT_READY_INTERVAL_SEC)
else:
# When sync has been established, use default wait time.
self._event.wait(self.time_sync_interval_sec)
self._event.clear()
# Do RPC call to update time-sync information.
if not self.should_exit:
self._time_sync_endpoint.get_new_estimate()
# For now, on GRPC error, store the error object and exit the thread.
except Error as err:
with self._lock:
self._locked_thread_exception = err