-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
163 lines (117 loc) · 5.36 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from bluepy.btle import Scanner, DefaultDelegate, Peripheral, BTLEManagementError, BTLEDisconnectError
import struct
import time
import threading
# thread to read weight from the scale
class ReadWeightThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.name = READ_THREAD_NAME
def run(self):
try:
weight = self.read_scale()
# if we successfully got the weight, set the last_measurement time
if weight:
global last_measurement
last_measurement = time.time()
# do something with the measurement here
except Exception as e:
print(e)
# connect to the scale and return the weight once the measurement is completed
def read_scale(self):
# connect to the scale
p = Peripheral(SCALE_ADDRESS, 'public')
# create the delegate
delegate = BLEDelegate()
p.setDelegate(delegate)
# enable notifications
p.writeCharacteristic(NOTIFY_HANDLE, NOTIFY_VALUE, withResponse=True)
# start the weight notifications
p.writeCharacteristic(MEASUREMENT_HANDLE, BEGIN_MEASUREMENT_VALUE)
start_time = time.time()
# wait until measurement is done;
while not delegate.measurement_done:
# if it takes longer than 60 seconds then something is probably wrong so stop
if (time.time() - start_time) > 60:
break
if p.waitForNotifications(1.0):
continue
# if we successfully took a measurement
if delegate.measurement_done:
print('measurement done! weight is {} kg, {:.1f} lbs'.format(delegate.weight, delegate.weight * 2.20462))
# disconnect from the scale
p.disconnect()
return delegate.weight
def is_thread_active(thread_name):
for thread in threading.enumerate():
if thread_name == thread.name:
return True
# delegate class used with bluepy
# combines the scan and notification delegates into a single class
class BLEDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
self.weight = None
self.measurement_done = False
# function that will be called when new devices are discovered
def handleDiscovery(self, dev, is_new_dev, is_new_data):
# if the newly discovered device is the scale
# dev.addr is lowercase so make sure the SCALE_ADDRESS is as well
if is_new_dev and dev.addr == SCALE_ADDRESS.lower():
# make sure the read weight thread isn't already running
# the MEASUREMENT_COOLDOWN should prevent this from happening but just to be sure...
if is_thread_active(READ_THREAD_NAME):
print('thread already started!')
return
if not last_measurement or time.time() - last_measurement > MEASUREMENT_COOLDOWN:
print('starting read weight thread')
read_weight_thread = ReadWeightThread()
read_weight_thread.setDaemon(True)
read_weight_thread.start()
# function that will be called when notifications are received
def handleNotification(self, handle, data):
# the weight notification starts with \x10
if data[0] == 16:
# convert the 4th and 5th bytes to decimal and then divide by 100
# to get the weight in kilograms
weight = int(data[3:5].hex(), 16) / 100
print('weight is {} kg'.format(weight))
# once the measurement is done byte #6 should be set to 1
# this will coincide with the weight on the scale flashing
if data[5] == 1:
print('measurement done, final weight is {}'.format(weight))
self.measurement_done = True
self.weight = weight
# the mac address of the scale
SCALE_ADDRESS = '04:ac:44:0a:14:be'
# the bluetooth handle used to enable notifications
NOTIFY_HANDLE = 0x0016
# the value that needs to be written to the above handle to enable notifications
NOTIFY_VALUE = struct.pack('<BB', 0x01, 0x00)
# the bluetooth handle used to start weight notifications
MEASUREMENT_HANDLE = 0x0013
# the value that needs to be written to the above handle to start weight notifications
BEGIN_MEASUREMENT_VALUE = struct.pack('<BBBBBBBB', 0x20, 0x08, 0x15, 0xda, 0xb9, 0xe8, 0x25, 0xdd)
# the scale will still be discoverable and connectable for a short time after the display turns off
# to prevent repeated connections, set a cooldown between measurements
MEASUREMENT_COOLDOWN = 900
READ_THREAD_NAME = 'Read Weight Thread'
# global var to track when we last took a measurement
last_measurement = None
while True:
try:
scanner = Scanner().withDelegate(BLEDelegate())
# make sure the read thread isn't active before starting another scan
if not is_thread_active(READ_THREAD_NAME):
scanner.scan(MEASUREMENT_COOLDOWN, passive=True)
else:
print('read thread active, not starting new scan, sleeping for 10 seconds')
time.sleep(10)
except BTLEDisconnectError as e:
print('device disconnected? {}'.format(e))
time.sleep(1)
except BTLEManagementError as e:
print('something wrong with bluetooth device? {}'.format(e))
time.sleep(1)
except KeyboardInterrupt:
break