-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathautobrew
executable file
·109 lines (87 loc) · 2.83 KB
/
autobrew
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
import logging
import signal
from datetime import datetime
from time import sleep
from abpi.brewday.recipes import RecipeManager
from abpi.gadget.control import GadgetVariableSpace
from abpi.input.inputcli import ABInputClient
from abpi.ui.screenbuf import ScreenBuffer
from abpi.user.boilctlscr import ABBoilScreen
from abpi.user.builder import SystemBuilder
from abpi.user.mainscr import ABMainScreen
from abpi.user.mashctlscr import ABMashScreen
from abpi.user.recipescr import ABRecipeScreen
from abpi.user.splashscr import ABSplashScreen
def usleep(useconds):
"""Sleep for microseconds"""
return sleep(useconds / 1000000.0)
if __name__ == "__main__":
def _handle_signal(*args):
print("Shutting down...")
buf.stop()
buf.join()
gvspace.stop()
gvspace.join()
icli.stop()
icli.join()
print("Done")
exit(0)
def shutdown():
_handle_signal()
signal.signal(signal.SIGTERM, _handle_signal)
logging.basicConfig(
level=logging.DEBUG,
filename="autobrew.log",
filemode="w",
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger("").addHandler(console)
# logging
logger = logging.getLogger("AutoBrew")
logger.info("AutoBrew loading...")
icli = ABInputClient("http://localhost:5556")
try:
gvspace = GadgetVariableSpace(0xFF000001)
except IOError:
logger.error("Could not connect to HBUS master, shutting down...")
exit(0)
# recipe manager
recipemgr = RecipeManager("config/recipes/")
def _turn_lcd_on():
gvspace.send_gadget_command(0x05)
splash_screen = ABSplashScreen()
buf = ScreenBuffer(
240, 64, icli, screen_on_hook=_turn_lcd_on, splash=splash_screen
)
# build system
sys = SystemBuilder(
config_file="config/user/absystem.json", gadget_vspace=gvspace
)
# get information
gvspace._debug_dump_port_list()
main_screen = ABMainScreen()
mash_screen = ABMashScreen(varspace=gvspace, recipemgr=recipemgr)
boil_screen = ABBoilScreen(varspace=gvspace, recipemgr=recipemgr)
recipe_screen = ABRecipeScreen(recipemgr=recipemgr)
buf.add_screen("main", main_screen)
buf.add_screen("mash", mash_screen)
buf.add_screen("boil", boil_screen)
buf.add_screen("recipes", recipe_screen)
# start threads
buf.start()
gvspace.start()
icli.start()
# activate screen
buf.activate_screen("main")
logger.info("AutoBrew started")
last_cycle = datetime.now()
while True:
try:
td = datetime.now() - last_cycle
last_cycle = datetime.now()
sleep(0.1)
except KeyboardInterrupt:
_handle_signal(None)