forked from scottyphillips/pychonet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_async.py
72 lines (59 loc) · 2.4 KB
/
example_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sys
import asyncio
from pprint import pprint
from aioudp import UDPServer
from pychonet import ECHONETAPIClient as api
from pychonet.lib.const import ENL_SETMAP, ENL_GETMAP, ENL_UID, ENL_MANUFACTURER, VERSION
from pychonet.lib.epc import EPC_SUPER, EPC_CODE
from pychonet.lib.eojx import EOJX_GROUP, EOJX_CLASS
# This example will list the properties for all discovered instances on a given host
def epc2str(gc, cc, pc):
try:
return EPC_SUPER[pc]
except KeyError:
pass
try:
return EPC_CODE[gc][cc][pc]
except KeyError:
return "Unknown"
async def main(argv):
udp = UDPServer()
loop = asyncio.get_event_loop()
udp.run("0.0.0.0", 3610, loop=loop)
server = api(server=udp, loop=loop)
server._debug_flag = False
server._message_timeout = 300
target = argv[1]
await server.discover(target)
# Timeout after 3 seconds
for x in range(0, 300):
await asyncio.sleep(0.01)
if 'discovered' in list(server._state[target]):
break
instance_list = []
state = server._state[target]
for eojgc in state['instances'].keys():
for eojcc in state['instances'][eojgc].keys():
for instance in state['instances'][eojgc][eojcc].keys():
# issue here??
await server.getAllPropertyMaps(target, eojgc, eojcc, 0x00)
getmap = [(hex(e), epc2str(eojgc, eojcc, e))
for e in state['instances'][eojgc][eojcc][instance][ENL_GETMAP]]
setmap = [(hex(e), epc2str(eojgc, eojcc, e))
for e in state['instances'][eojgc][eojcc][instance][ENL_SETMAP]]
await server.getIdentificationInformation(target, eojgc, eojcc, instance)
uid = state['instances'][eojgc][eojcc][instance][ENL_UID]
manufacturer = state['instances'][eojgc][eojcc][instance][ENL_MANUFACTURER]
instance_list.append({
"host": target,
"group": (hex(eojgc), EOJX_GROUP[eojgc]),
"class": (hex(eojcc), EOJX_CLASS[eojgc][eojcc]),
"instance": hex(instance),
"getmap": getmap,
"setmap": setmap,
"uid": uid,
"manufacturer": manufacturer
})
pprint(instance_list)
if __name__ == "__main__":
asyncio.run(main(sys.argv))