-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpath_2023_task_1.py
206 lines (170 loc) · 7.12 KB
/
path_2023_task_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""
Reads waypoints from QR code and sends drone commands.
"""
import pathlib
import time
import msvcrt
import dronekit
from modules import add_takeoff_and_landing_command
from modules import load_waypoint_name_to_coordinates_map
from modules import upload_commands
from modules import waypoint_tracking
from modules import waypoints_dict_to_list
from modules import waypoints_to_commands
from modules import diversion_waypoints_from_vertices
from modules import generate_command
from modules.common.modules import location_global
WAYPOINT_FILE_PATH = pathlib.Path("2023", "waypoints", "competition_task_1.csv")
DIVERSION_WAYPOINT_FILE_PATH = pathlib.Path("2023", "waypoints", "diversion_waypoints.csv")
REJOIN_WAYPOINT_FILE_PATH = pathlib.Path("2023", "waypoints", "rejoin_waypoint.csv")
CAMERA = 0
ALTITUDE = 40
DRONE_TIMEOUT = 30.0 # seconds
CONNECTION_ADDRESS = "tcp:localhost:14550"
LOG_DIRECTORY_PATH = pathlib.Path("logs")
KML_FILE_PREFIX = "waypoints"
DELAY = 0.1 # seconds
# Code copied into path_2024.py
# pylint: disable=duplicate-code
def main() -> int:
"""
Main function.
"""
pathlib.Path(LOG_DIRECTORY_PATH).mkdir(exist_ok=True)
# Wait ready is false as the drone may be on the ground
drone = dronekit.connect(CONNECTION_ADDRESS, wait_ready=False)
# Read in hardcoded waypoints from CSV file
# Waypoints are stored in order of insertion, starting with the top row
(
result,
waypoint_name_to_coordinates,
) = load_waypoint_name_to_coordinates_map.load_waypoint_name_to_coordinates_map(
WAYPOINT_FILE_PATH,
)
if not result:
print("ERROR: load_waypoint_name_to_coordinates_map")
return -1
result, waypoints_list = waypoints_dict_to_list.waypoints_dict_to_list(
waypoint_name_to_coordinates,
)
if not result:
print("ERROR: convert waypoints from dict to list")
return -1
print("Content of waypoints_list:", waypoints_list)
result, waypoint_commands = waypoints_to_commands.waypoints_to_commands(
waypoints_list, ALTITUDE
)
if not result:
print("Error: waypoints_to_commands")
return -1
(
result,
takeoff_landing_commands,
) = add_takeoff_and_landing_command.add_takeoff_and_landing_command(waypoint_commands, ALTITUDE)
if not result:
print("Error: add_takeoff_and_landing_command")
return -1
result = upload_commands.upload_commands(drone, takeoff_landing_commands, DRONE_TIMEOUT)
if not result:
print("Error: upload_commands")
return -1
is_qr_text_found = False
# Drone starts flying
while True:
result, waypoint_info = waypoint_tracking.get_current_waypoint_info(drone)
if not result:
print("Error: waypoint_tracking (waypoint_info)")
else:
print(f"Current waypoint sequence: {waypoint_info}")
result, location = waypoint_tracking.get_current_location(drone)
if not result:
print("Error: waypoint_tracking (get_current_location)")
else:
print(f"Current location (Lat, Lon): {location}")
is_qr_text_found = False
# nonblocking, just takes single picture
# is_qr_text_found, diversion_qr_text = diversion_qr_input.diversion_qr_input(CAMERA)
# wait for text input in console
print("Press 'q' to simulate QR code found.")
if not is_qr_text_found:
# Check if input is available
if msvcrt.kbhit():
input_char = msvcrt.getwche()
if input_char == "q":
is_qr_text_found = True
print("Simulated QR code found.")
# run once when qr code is detected
if is_qr_text_found:
# Read in hardcoded waypoints from CSV file
# Waypoints are stored in order of insertion, starting with the top row
(result, diversion_waypoint_list) = (
load_waypoint_name_to_coordinates_map.load_waypoint_name_to_coordinates_map(
DIVERSION_WAYPOINT_FILE_PATH,
)
)
if not result:
print("ERROR: diversion_waypoint_name_to_coordinates_map")
return -1
diversion_waypoint_values_list = list(diversion_waypoint_list.values())
(result, rejoin_waypoint_list) = (
load_waypoint_name_to_coordinates_map.load_waypoint_name_to_coordinates_map(
REJOIN_WAYPOINT_FILE_PATH,
)
)
if not result:
print("ERROR: rejoin_waypoint_name_to_coordinates_map")
return -1
# convert tuple[float, float] to location ground
current_latitude, current_longitude = location
result, named_location = location_global.LocationGlobal.create(
current_latitude, current_longitude
)
if not result:
print("ERROR: Could not create named location")
return -1
waypoints_around_diversion = (
diversion_waypoints_from_vertices.diversion_waypoints_from_vertices(
named_location,
next(iter(rejoin_waypoint_list.values())),
diversion_waypoint_values_list,
)
)
result, waypoints_around_diversion_commands = (
waypoints_to_commands.waypoints_to_commands(waypoints_around_diversion, ALTITUDE)
)
if not result:
print("Error: diversion_waypoints_to_commands")
return -1
# add the waypoint_around_diversion in between current commands
# rest of commands starting from rejoin waypoint
first_key = next(iter(rejoin_waypoint_list))
waypoints_after_diversion = waypoints_list[
waypoints_list.index(rejoin_waypoint_list[first_key]) :
]
result, waypoints_after_diversion_commands = (
waypoints_to_commands.waypoints_to_commands(waypoints_after_diversion, ALTITUDE)
)
if not result:
print("Error: post_diversion_waypoints_to_commands")
return -1
# combine diversion commands and regular commands after diversion
diversion_route_commands = (
waypoints_around_diversion_commands + waypoints_after_diversion_commands
)
# add landing command at the end
landing_command = generate_command.landing()
diversion_route_commands.append(landing_command)
print("Commands ready to upload")
# upload waypoint_around_diversion + waypoints_after_diversion as new ones
result = upload_commands.upload_commands(drone, diversion_route_commands, DRONE_TIMEOUT)
if not result:
print("Error: diversion_route_upload_commands")
return -1
time.sleep(DELAY)
return 0
# pylint: enable=duplicate-code
if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")
print("Done!")