-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.py
71 lines (52 loc) · 2.07 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
from asyncio import AbstractEventLoop
import async_timeout
import logging
from pyflichub.button import FlicButton
from pyflichub.client import FlicHubTcpClient
from pyflichub.command import Command
from pyflichub.event import Event
from pyflichub.flichub import FlicHubInfo
logging.basicConfig(level=logging.DEBUG)
CLIENT_READY_TIMEOUT = 10.0
HOST = ('192.168.1.64', 8124)
def event_callback(button: FlicButton, event: Event):
print(f"Received event: {event.event}")
def command_callback(cmd: Command):
print(f"Received command: {cmd.command}")
async def start():
client_ready = asyncio.Event()
async def client_connected():
print("Connected!")
client_ready.set()
await client.get_server_info()
async def client_disconnected():
print("Disconnected!")
client.async_on_connected = client_connected
client.async_on_disconnected = client_disconnected
task = asyncio.create_task(client.async_connect())
try:
async with async_timeout.timeout(CLIENT_READY_TIMEOUT):
await client_ready.wait()
except asyncio.TimeoutError:
print(f"Client not connected after {CLIENT_READY_TIMEOUT} secs so terminating")
exit()
buttons: list[FlicButton] = await client.get_buttons()
for button in buttons:
print(f"Button name: {button.name} - Connected: {button.connected}")
network: FlicHubInfo = await client.get_hubinfo()
if network.has_wifi():
print(f"Wifi State: {network.wifi.state} - Connected: {network.wifi.connected}")
if network.has_ethernet():
print(f"Ethernet IP: {network.ethernet.ip} - Connected: {network.ethernet.connected}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
client = FlicHubTcpClient(*HOST, loop=loop, event_callback=event_callback, command_callback=command_callback)
try:
loop.run_until_complete(start())
loop.run_forever()
except KeyboardInterrupt:
client.disconnect()
loop.close()
except Exception as exc: # pylint: disable=broad-except
print(exc)