-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinserter.py
220 lines (189 loc) · 7.82 KB
/
inserter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import mysql.connector
from mysql.connector import errorcode
from swarmapi import SwarmAPI
import sched, time
class SwarmBase:
def __init__(
self,
config={
"user": "shashank",
"password": "mariamaria",
"host": "localhost",
"raise_on_warnings": True,
"use_pure": False,
},
):
try:
self.cnx = mysql.connector.connect(**config)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
else:
print("MariaDB connection successful.")
self.cursor = self.cnx.cursor()
self.swarm = SwarmAPI()
print("Initializing database ...")
self.init_db()
def _init_tables(self):
DB_NAME = "SWARM"
TABLES = {}
TABLES["telemetry"] = (
"CREATE TABLE IF NOT EXISTS `telemetry` ("
" `packetId` int NOT NULL,"
" `telemetryVersion` int NOT NULL,"
" `telemetryAt` varchar(40) NOT NULL,"
" `telemetryLatitude` float(32) NOT NULL,"
" `telemetryLongitude` float(32) NOT NULL,"
" `telemetryAltitude` int(12) NOT NULL,"
" `telemetryCourse` int(12) NOT NULL,"
" `telemetrySpeed` int(12) NOT NULL,"
" `telemetryBatteryVoltage` float(12) NOT NULL,"
" `telemetryBatteryCurrent` float(12) NOT NULL,"
" `telemetryTemperatureK` int(32) NOT NULL,"
" `deviceType` int(4) NOT NULL,"
" `deviceId` int(32) NOT NULL,"
" PRIMARY KEY (`packetId`)"
") ENGINE=InnoDB"
)
TABLES["data"] = (
"CREATE TABLE IF NOT EXISTS `data` ("
" `messageId` int NOT NULL,"
" `packetId` int NOT NULL,"
" `deviceType` int(4) NOT NULL,"
" `deviceId` int(32) NOT NULL,"
" `viaDeviceId` int(32) NOT NULL,"
" `dataType` int(4) NOT NULL,"
" `userApplicationId` int(32) NOT NULL,"
" `organizationId` int(32) NOT NULL,"
" `len` int(16) NOT NULL,"
" `data` varchar(1000) NOT NULL,"
" `ackPacketId` int(12) NOT NULL,"
" `status` int(4) NOT NULL,"
" `hiveRxTime` varchar(40) NOT NULL,"
" PRIMARY KEY (`packetId`) "
") ENGINE=InnoDB"
)
try:
self.cursor.execute("USE {}".format(DB_NAME))
except mysql.connector.Error as err:
print("Database {} does not exist.".format(DB_NAME))
if err.errno == errorcode.ER_BAD_DB_ERROR:
self._create_database(DB_NAME)
print("Database {} created successfully.".format(DB_NAME))
self.cnx.database = DB_NAME
else:
print(err)
exit(1)
for table_name in TABLES:
table_description = TABLES[table_name]
try:
print("Creating table {}: ".format(table_name), end="")
self.cursor.execute(table_description)
except mysql.connector.Error as err:
print(err.msg)
else:
print("OK")
self._define_logs()
def _define_logs(self):
self.add_data = (
"INSERT IGNORE INTO data "
"(messageId, packetId, deviceType, deviceId, viaDeviceId, dataType, "
"userApplicationId, organizationId, len, data, "
"ackPacketId, status, hiveRxTime) VALUES ("
"%(messageId)s, %(packetId)s, %(deviceType)s, %(deviceId)s, %(viaDeviceId)s, %(dataType)s, "
"%(userApplicationId)s, %(organizationId)s, %(len)s, %(data)s, "
"%(ackPacketId)s, %(status)s, %(hiveRxTime)s)"
)
self.add_telem = (
"INSERT IGNORE INTO telemetry "
"(packetId, telemetryVersion, telemetryAt, telemetryLatitude,"
"telemetryLongitude, telemetryAltitude, telemetryCourse, "
"telemetrySpeed, telemetryBatteryVoltage, "
"telemetryBatteryCurrent, telemetryTemperatureK, deviceType, "
"deviceId) "
"VALUES (%(packetId)s, %(telemetryVersion)s, %(telemetryAt)s, %(telemetryLatitude)s,"
"%(telemetryLongitude)s, %(telemetryAltitude)s, %(telemetryCourse)s, "
"%(telemetrySpeed)s, %(telemetryBatteryVoltage)s, "
"%(telemetryBatteryCurrent)s, %(telemetryTemperatureK)s, %(deviceType)s, "
"%(deviceId)s)"
)
def _create_database(self, DB_NAME):
try:
self.cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME)
)
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
def _log_dping(self, ping):
"""
Log data ping into MariaDB database named 'data'.
Follows the structure of the database. The insertion method is described in the log structure attribute `add_data`.
Input:
ping: One JSON data message. Ensure packet labels match database labels.
Returns: None
"""
self.cursor.execute(self.add_data, ping)
def _log_tping(self, ping):
"""
Log telemetry ping into MariaDB database named 'telemetry'.
Follows the structure of the database. The insertion method is described in the log structure attribute `add_telem`.
Input:
One JSON telemetry message. Ensure packet labels match database labels.
Returns: None
"""
self.cursor.execute(self.add_telem, ping)
def _log_recent(self, count=10, use_cached=False):
"""
Logs all recently retrieved messages. Ignores duplicates.
Input:
count: `int` (Optional) Selects how many recent messages are fetched from the native SWARM server. Range [10,1000]
use_cached: `boolean` (Optional) Chooses to use data cached from previous retrieval if True. Default to False.
"""
self.swarm.get_recent_data(count=count, use_cached=use_cached)
for tping in self.swarm.recent_telemetry:
try:
self._log_tping(tping)
except:
self.tel_dup_num += 1
print("Logged all unique telemetry pings.")
if self.tel_dup_num > 100:
print(
"WARNING! %s duplicate telemetry log entries attempted."
% self.tel_dup_num
)
print("Duplicate entry attempts exceeds 100 ... resetting attempt counter.")
self.tel_dup_num = 0
for dping in self.swarm.recent_data:
try:
self._log_dping(dping)
except:
self.dat_dup_num += 1
print("Logged all unique data pings.")
if self.dat_dup_num > 100:
print(
"WARNING! %s duplicate data log entries attempted." % self.dat_dup_num
)
print("Duplicate entry attempts exceeds 100 ... resetting attempt counter.")
self.dat_dup_num = 0
self.cnx.commit()
print("Logs committed.")
def init_db(self):
self._init_tables()
self._define_logs()
print("Logging is defined.")
self.tel_dup_num = 0
self.dat_dup_num = 0
print("Fetching all retained server history...")
self._log_recent(count=1000, use_cached=False)
def update(self):
self._log_recent(count=10, use_cached=False)
if __name__ == "__main__":
sb = SwarmBase()
s = sched.scheduler(time.time, time.sleep)
s.enter(60, 1, sb.update)
s.run()