-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathdev.py
executable file
·88 lines (70 loc) · 2.44 KB
/
dev.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
import http.server
import queue
import sys
import threading
from pathlib import Path
from queue import SimpleQueue
import sphinx.cmd.build
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class QueuingEventHandler(FileSystemEventHandler):
def __init__(self, q: SimpleQueue, *args, **kwargs):
super().__init__(*args, **kwargs)
self._queue = q
def on_any_event(self, event) -> None:
self._queue.put_nowait(event)
class CustomDirHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory='build/dirhtml', **kwargs)
def log_error(self, format: str, *args) -> None:
super().log_message(format, *args)
def log_message(self, format: str, *args) -> None:
pass
class HttpServerThread(threading.Thread):
def __init__(self):
super().__init__(name="HttpServer")
self.server = http.server.ThreadingHTTPServer(('', 8999), CustomDirHandler)
def run(self):
(addr, port) = self.server.server_address
print(f"Serving docs at http://{addr}:{port}/")
self.server.serve_forever()
def main():
rebuild()
server_thread = HttpServerThread()
server_thread.start()
path = Path("./source")
my_queue = SimpleQueue()
handler = QueuingEventHandler(my_queue)
observer = Observer()
observer.schedule(handler, str(path), recursive=True)
observer.start()
print(f"Watching {path} for changes...", file=sys.stderr)
try:
while observer.is_alive():
try:
my_queue.get(timeout=0.5)
except queue.Empty:
# Loop and check observer is alive
continue
while True:
# debounce by waiting for next event
try:
my_queue.get(timeout=0.5)
except queue.Empty:
# there was none, break
break
rebuild()
finally:
server_thread.server.shutdown()
server_thread.join()
observer.stop()
observer.join()
def rebuild():
print("Performing docs build", file=sys.stderr)
target = 'dirhtml'
exit_code = sphinx.cmd.build.main(['-b', target, 'source', 'build/' + target])
if exit_code != 0:
print("Sphinx exited with error code", exit_code, file=sys.stderr)
if __name__ == "__main__":
main()