-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplotter.py
executable file
·116 lines (92 loc) · 3.39 KB
/
plotter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/python3
from pymoos import pymoos
import time
import matplotlib.pyplot as plt
import numpy as np
import threading
fig, ax = plt.subplots(subplot_kw=dict(polar=True))
ax.set_theta_direction(-1)
ax.set_theta_zero_location('N')
nav_line, des_line, = ax.plot([], [], 'r', [], [], 'b')
nav_line.set_label('NAV')
des_line.set_label('DESIRED')
ax.legend()
class plotter(pymoos.comms):
"""plotter is a simple app that connects to MOOSDB and plots data."""
def __init__(self, moos_community, moos_port):
"""Initiates MOOSComms, sets the callbacks and runs the loop"""
super(plotter, self).__init__()
self.server = moos_community
self.port = moos_port
self.name = 'plotter'
self.d_heading = 0
self.d_speed = 0
self.n_heading = 0
self.n_speed = 0
# getting a lock to threadsafely draw
self.lock = threading.Lock()
self.set_on_connect_callback(self.__on_connect)
self.set_on_mail_callback(self.__on_new_mail)
self.add_active_queue('nav_queue', self.on_nav)
self.add_message_route_to_active_queue('nav_queue', 'NAV_HEADING')
self.add_message_route_to_active_queue('nav_queue', 'NAV_SPEED')
self.add_active_queue('desired_queue', self.on_desired)
self.add_message_route_to_active_queue('desired_queue', 'DESIRED_HEADING')
self.add_message_route_to_active_queue('desired_queue', 'DESIRED_SPEED')
self.run(self.server, self.port, self.name)
def __on_connect(self):
"""OnConnect callback"""
print("Connected to", self.server, self.port,
"under the name ", self.name)
return (self.register("NAV_SPEED", 0)
and self.register("NAV_HEADING", 0)
and self.register("DESIRED_SPEED", 0)
and self.register("DESIRED_HEADING", 0))
def __on_new_mail(self):
"""OnNewMail callback"""
for msg in self.fetch():
print("Unhandled mail received:", msg.key(), "!")
return True
def on_nav(self, msg):
"""Special callback for NAV_*"""
print("on_nav activated by",
msg.key(), "with value", msg.double())
if msg.key() == 'NAV_HEADING':
self.n_heading = msg.double()
elif msg.key() == 'NAV_SPEED':
self.n_speed = msg.double()
r = np.arange(0, self.n_speed, 0.1)
theta = np.deg2rad(self.n_heading)
self.lock.acquire()
try:
nav_line.set_xdata(theta)
nav_line.set_ydata(r)
ax.set_rmax(5)
plt.draw()
finally:
self.lock.release()
return True
def on_desired(self, msg):
"""Special callback for DESIRED_*"""
print("on_desired activated by",
msg.key(), "with value", msg.double())
if msg.key() == 'DESIRED_HEADING':
self.d_heading = msg.double()
elif msg.key() == 'DESIRED_SPEED':
self.d_speed = msg.double()
r = np.arange(0, self.d_speed, 0.1)
theta = np.deg2rad(self.d_heading)
self.lock.acquire()
try:
des_line.set_xdata(theta)
des_line.set_ydata(r)
ax.set_rmax(5)
plt.draw()
finally:
self.lock.release()
return True
def main():
plottr = plotter('localhost', 9000)
plt.show()
if __name__=="__main__":
main()