forked from fluentpython/concurrency2017
-
Notifications
You must be signed in to change notification settings - Fork 0
/
launchpad.py
executable file
·120 lines (92 loc) · 3.78 KB
/
launchpad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
"""
Brett Cannon's launchpad: concurrent countdowns driven by a custom event loop,
without `asyncio`. This example shows how `async/await` is independent of
`asyncio` or any specific asynchronous programming library.
Source: "How the heck does async/await work in Python 3.5?"
https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
"""
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again.
Comparison operators are implemented for use by heapq. Two-item
tuples unfortunately don't work because when the datetime.datetime
instances are equal, comparison falls to the coroutine and they don't
implement comparison methods, triggering an exception.
Think of this as being like asyncio.Task/curio.Task.
"""
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines.
Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
"""
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# We're ahead of schedule; wait until it's time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
@types.coroutine
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds.
Think of this as being like asyncio.sleep()/curio.sleep().
"""
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
async def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds.
This is what a user would typically write.
"""
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = await sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = await sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
"""Start the event loop, counting down 3 separate launches.
This is what a user would typically write.
"""
loop = SleepingLoop(countdown('A', 5),
countdown('B', 3, delay=2),
countdown('C', 4, delay=1))
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()