-
Notifications
You must be signed in to change notification settings - Fork 0
/
antipodal_swap.py
executable file
·135 lines (107 loc) · 3.84 KB
/
antipodal_swap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
import rospy
from gestelt_msgs.msg import Command, CommanderState, Goals
from geometry_msgs.msg import Transform, PoseStamped
from std_msgs.msg import Int8
num_drones = 4
# Publisher of server events to trigger change of states for trajectory server
goal_publishers = []
for i in range(0, num_drones):
goal_publishers.append(rospy.Publisher(f'/drone{i}/planner_adaptor/goals', Goals, queue_size=2))
# Dictionary of UAV states
server_states = {}
# Check if UAV has achived desired traj_server_state
def check_traj_server_states(des_traj_server_state):
if len(server_states.items()) == 0:
print("No Server states received!")
return False
for server_state in server_states.items():
# print(f"{server_state[0]}: {des_traj_server_state}")
if server_state[1].traj_server_state != des_traj_server_state:
return False
return True
def publish_server_cmd(event_enum):
for drone_id in range(0, num_drones):
traj_server_cmd_msg = Command()
traj_server_cmd_msg.command = event_enum
# Publisher of server events to trigger change of states for trajectory server
traj_server_cmd_pub = rospy.Publisher(f'/drone{drone_id}/traj_server/command', Command, queue_size=5)
traj_server_cmd_pub.publish(traj_server_cmd_msg)
def get_server_state_callback():
for drone_id in range(0, num_drones):
msg = rospy.wait_for_message(f"/drone{drone_id}/traj_server/state", CommanderState, timeout=5.0)
server_states[str(msg.drone_id)] = msg
# print("==================")
# print(msg)
# print("==================")
def create_transform(x, y, z):
pos = Transform()
pos.translation.x = x
pos.translation.y = y
pos.translation.z = z
pos.rotation.x = 0
pos.rotation.y = 0
pos.rotation.z = 0
pos.rotation.w = 1
return pos
def create_pose(x, y, z):
pos = PoseStamped()
pos.pose.position.x = x
pos.pose.position.y = y
pos.pose.position.z = z
pos.pose.orientation.x = 0
pos.pose.orientation.y = 0
pos.pose.orientation.z = 0
pos.pose.orientation.w = 0
return pos
def pub_goals(goals):
if (len(goals) > num_drones):
print(f"Failed to publish goals. No. of goals ({len(goals)}) > num_drones ({num_drones})")
return False
for i in range(0, len(goals)):
goals_msg = Goals()
goals_msg.header.frame_id = "world"
goals_msg.transforms = goals[i]
goal_publishers[i].publish(goals_msg)
return True
def main():
rospy.init_node('mission_startup', anonymous=True)
rate = rospy.Rate(5) # 20hz
if check_traj_server_states("MISSION"):
print("Already in mission mode")
else:
print("Setting to HOVER mode!")
# Take off
while not rospy.is_shutdown():
get_server_state_callback()
if check_traj_server_states("HOVER"):
break
publish_server_cmd(0)
print("tick!")
rate.sleep()
print("Setting to MISSION mode!")
# Switch to mission mode
while not rospy.is_shutdown():
get_server_state_callback()
if check_traj_server_states("MISSION"):
break
publish_server_cmd(2)
print("tick!")
rate.sleep()
# Send waypoints to UAVs
print(f"Sending waypoints to UAVs")
goals_0 = []
goals_1 = []
goals_2 = []
goals_3 = []
# Start from (0.0, 2.0)
goals_0.append(create_transform(0.0, -2.0, 1.0))
# Start from (2.0, 0.0)
goals_1.append(create_transform(-2.0, 0.0, 1.0))
# Start from (0.0, -2.0)
goals_2.append(create_transform(0.0, 2.0, 1.0))
# Start from (-2.0, 0.0)
goals_3.append(create_transform(2.0, 0.0, 1.0))
pub_goals([goals_0, goals_1, goals_2, goals_3])
if __name__ == '__main__':
main()