-
Notifications
You must be signed in to change notification settings - Fork 192
/
Copy pathasync_send_receive.py
65 lines (51 loc) · 1.96 KB
/
async_send_receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# TinyTuya Example
# -*- coding: utf-8 -*-
"""
TinyTuya - Example showing async persistent connection to device with
continual loop watching for device updates.
Author: Jason A. Cox
For more information see https://github.com/jasonacox/tinytuya
"""
import time
import tinytuya
# tinytuya.set_debug(True)
d = tinytuya.OutletDevice('DEVICEID', 'DEVICEIP', 'DEVICEKEY')
d.set_version(3.3)
d.set_socketPersistent(True)
# Devices will close the connection if they do not receve data every 30 seconds
# Sending heartbeat packets every 9 seconds gives some wiggle room for lost packets or loop lag
PING_TIME = 9
# Option - also poll
POLL_TIME = 60
print(" > Send Request for Status < ")
d.status(nowait=True)
print(" > Begin Monitor Loop <")
pingtime = time.time() + PING_TIME
polltime = time.time() + POLL_TIME
while(True):
# See if any data is available
data = d.receive()
if data:
print('Received Payload: %r' % data)
if( pingtime <= time.time() ):
pingtime = time.time() + PING_TIME
# Send keep-alive heartbeat
print(" > Send Heartbeat Ping < ")
d.heartbeat(nowait=True)
# Option - Poll for status
if( polltime <= time.time() ):
polltime = time.time() + POLL_TIME
# Option - Some plugs require an UPDATEDPS command to update their power data points
if False:
print(" > Send DPS Update Request < ")
# # Some Tuya devices require a list of DPs to update
# payload = d.generate_payload(tinytuya.UPDATEDPS,['18','19','20'])
# data = d.send(payload)
# print('Received Payload: %r' % data)
# # Other devices will not accept the DPS index values for UPDATEDPS - try:
# payload = d.generate_payload(tinytuya.UPDATEDPS)
# data = d.send(payload)
# print('Received Payload: %r' % data)
print(" > Send Request for Status < ")
data = d.status()
print('Received Payload: %r' % data)