forked from philhofer/distill
-
Notifications
You must be signed in to change notification settings - Fork 0
/
coroutine.scm
371 lines (338 loc) · 12.5 KB
/
coroutine.scm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
;; some helpers for receiving SIGCHLD
;; and managing poll() descriptors
(foreign-declare "#include \"coroutine.inc.h\"")
;; push 'val' to the end of the queue represented by 'p'
;; (where 'p' is a pair and (car p) is the head of a
;; list and (cdr p) is the end of it)
(: queue-push! ((pair (list-of procedure) (list-of procedure)) procedure -> undefined))
(define (queue-push! p val)
(let ((tail (cdr p))
(end (cons val '())))
(if (null? tail)
(set-car! p end)
(set-cdr! tail end))
(set-cdr! p end)))
(: queue-pop ((pair (list-of procedure) (list-of procedure)) -> (or procedure false)))
(define (queue-pop! p)
(let ((head (car p)))
(and (pair? head)
(let ((first (car head))
(rest (cdr head)))
(set-car! p rest)
(when (null? rest)
(set-cdr! p '()))
first))))
;; runnable continuations
(: *cont-queue* (pair (list-of procedure) (list-of procedure)))
(define *cont-queue* (cons '() '()))
;; continuations parked on wait(2)
(define *wait-tab* (make-hash-table test: = hash: number-hash))
(define *readfd-tab* (make-hash-table test: = hash: number-hash))
(define *writefd-tab* (make-hash-table test: = hash: number-hash))
(define-constant eintr 4)
(define-constant eagain 11)
(: fdclose (fixnum -> fixnum))
(define (fdclose fd)
((foreign-lambda int close int) fd))
(: fdpipe (-> (list fixnum fixnum)))
(define (fdpipe)
(let* ((vect (make-s32vector 2 -1))
(ret ((foreign-lambda* int ((s32vector pfd))
"C_return(pipe2(pfd, O_NONBLOCK));")
vect)))
(if (fx= ret 0)
(list (s32vector-ref vect 0) (s32vector-ref vect 1))
(error "pipe2() failed"))))
(: fdwrite (fixnum (or string u8vector) #!rest * -> integer))
(define (fdwrite fd buf #!optional size)
(let* ((%raw-write (foreign-lambda* long ((int fd) (scheme-pointer mem) (size_t sz))
"long out; out=write(fd,mem,sz); C_return(out>0?out:-errno);"))
(buflen (cond
((string? buf) (string-length buf))
((u8vector? buf) (u8vector-length buf))
(else (error "bad buf argument to fdwrite:" buf))))
(size (or size buflen)))
(let loop ((ret (%raw-write fd buf size)))
(cond
((>= ret 0) ret)
((= ret (- eintr))
(loop (%raw-write fd buf size)))
((= ret (- eagain))
(begin
(queue-wait!
(hash-table-update!/default
*writefd-tab*
fd
identity
(cons '() '())))
(loop (%raw-write fd buf size))))
(else (error "fdwrite: errno:" (- ret)))))))
(: fdread (fixnum (or string u8vector) #!rest * -> integer))
(define (fdread fd buf #!optional size)
(let* ((%raw-read (foreign-lambda* long ((int fd) (scheme-pointer mem) (size_t sz))
"long out; out=read(fd,mem,sz); C_return(out>=0?out:-errno);"))
(buflen (cond
((string? buf) (string-length buf))
((u8vector? buf) (u8vector-length buf))
(else (error "bad buf argument to fdread:" buf))))
(size (or size buflen)))
(let loop ((ret (%raw-read fd buf size)))
(cond
((>= ret 0) ret)
((= ret (- eintr))
(loop (%raw-read fd buf size)))
((= ret (- eagain))
(begin
(queue-wait!
(hash-table-update!/default
*readfd-tab*
fd
identity
(cons '() '())))
(loop (%raw-read fd buf size))))
(else (error "fdread: errno:" (- ret)))))))
(define (%poll-fds)
(let ((%raw-poll (foreign-lambda int do_poll s32vector int s32vector int bool))
(rfds (list->s32vector (hash-table-keys *readfd-tab*)))
(wfds (list->s32vector (hash-table-keys *writefd-tab*)))
(flush (lambda (vec ht)
(let ((len (s32vector-length vec)))
(let loop ((i 0))
(or (fx>= i len)
(let ((fd (s32vector-ref vec i)))
(if (fx= fd -1)
(loop (fx+ i 1))
(let ((q (hash-table-ref ht fd)))
(hash-table-delete! ht fd)
(let inner ((cont (queue-pop! q)))
(and cont (begin (pushcont! cont) (inner (queue-pop! q)))))
(loop (fx+ i 1)))))))))))
(when (= 0 (+ (s32vector-length rfds) (s32vector-length wfds)))
(fatal "deadlock"))
;; first, do a non-blocking poll; if nothing is immediately ready,
;; then perform a major GC and try again
(let again ((block #f))
;; do_poll() doesn't modify the fd vectors if ret==0,
;; so we can safely re-use them when no fds are ready
(let ((ret (%raw-poll rfds (s32vector-length rfds)
wfds (s32vector-length wfds)
block)))
(cond
((fx= 0 ret) (begin (gc #f) (again #t)))
((fx> 0 ret) (error "poll error:" (- ret)))
(else
(begin
(flush rfds *readfd-tab*)
(flush wfds *writefd-tab*))))))))
;; push a continuation onto the tail of the cont-queue
(: pushcont! (procedure -> undefined))
(define (pushcont! p)
(queue-push! *cont-queue* p))
;; pop a continuation off of the front of the cont-queue
(: popcont! (-> (or false procedure)))
(define (popcont!)
(queue-pop! *cont-queue*))
;; %yield is a local continuation exit; it does not return
(define (%yield)
(let ((cont (popcont!)))
(if cont
(begin (cont) (error "longjmp returned?"))
(begin (%poll) (%yield)))))
(: queue-wait! (pair -> undefined))
(define (queue-wait! p)
(call/cc
(lambda (ret)
(queue-push! p (lambda () (ret #t)))
(%yield))))
(define (empty-queue) (cons '() '()))
(: queue-signal! (pair -> boolean))
(define (queue-signal! q)
(let ((cont (queue-pop! q)))
(and cont (begin (pushcont! cont) #t))))
;; process-wait/yield is the semantically the same
;; as chicken.process#process-wait, except that it
;; suspends the current coroutine while waiting
(define (process-wait/yield pid)
(call/cc
(lambda (resume)
(hash-table-set! *wait-tab* pid resume)
(%yield))))
;; proc-status is one of:
;; 'started (running or paused in a continuation)
;; 'exn (terminated with an exception)
;; 'done (terminated with a value)
(: proc-status ((vector symbol * list) -> symbol))
(define (proc-status box)
(vector-ref box 0))
;; proc-return inspects the return value
;; of a coroutine
;; (the proc-return of a coroutine that has not
;; terminated is undefined)
(: proc-return ((vector symbol 'a list) -> 'a))
(define (proc-return box)
(vector-ref box 1))
(define (%procexit box status value)
(vector-set! box 0 status)
(vector-set! box 1 value)
(for-each
(lambda (ret)
(pushcont! (lambda () (ret value))))
(vector-ref box 2))
(vector-set! box 2 #f)
(%yield))
;; spawn runs (apply thunk args) asynchronously
;; and returns an opaque object that can be used
;; to query the status of the procedure;
;; see join/value, proc-status, proc-return
;;
;; any exceptions thrown when evaluating (thunk args ...)
;; are caught and the condition object is made available
;; through proc-return and join/value
(: spawn (procedure #!rest * -> (vector symbol * list)))
(define (spawn proc . args)
(let ((box (vector 'started #f '())))
(call/cc
(lambda (ret)
(pushcont! (lambda () (ret box)))
(%procexit box 'done
(parameterize ((current-exception-handler
(lambda (exn)
(%procexit box 'exn exn))))
(apply proc args)))))))
(: with-spawn (procedure list procedure -> (vector symbol * list)))
(define (with-spawn proc args handle)
(let* ((box (vector 'started #f '()))
(_ (handle box)))
(call/cc
(lambda (ret)
(pushcont! (lambda () (ret box)))
(%procexit box 'done
(parameterize ((current-exception-handler
(lambda (exn)
(%procexit box 'exn exn))))
(apply proc args)))))))
(define (wait-any-nohang)
(call/cc
(lambda (ret)
(parameterize ((current-exception-handler (lambda (exn) (ret 0 #f #f))))
(process-wait -1 #t)))))
;; %pid-poll reads from the chldfd eventfd
;; and then calls wait() repeatedly until there are no
;; child processes outstanding
(define (%pid-poll fd)
(let ((getcount (foreign-lambda* int ((int fd) (s64vector buf))
"C_return(read(fd,(int64_t *)buf,8)==8 ? 0 : errno);"))
(buf (make-s64vector 1 0)))
(let loop ((err (getcount fd buf)))
(cond
((= err 0) ;; happy case (note that we're ignoring the counter ...)
(let-values (((pid ok status) (wait-any-nohang)))
(cond
((= pid 0) (loop (getcount fd buf)))
((hash-table-ref/default *wait-tab* pid #f)
=> (lambda (cont)
(hash-table-delete! *wait-tab* pid)
(pushcont! (lambda () (cont pid ok status)))
(loop 0)))
(else
(info "warning: pid not registered?" pid)
(loop 0)))))
((= err eintr)
(loop (getcount fd buf)))
((= err eagain)
(begin
(queue-wait!
(hash-table-update!/default
*readfd-tab*
fd
identity
(cons '() '())))
(loop (getcount fd buf))))
(else
(error "fatal errno from read(eventfd)" fd err (s64vector-ref buf 0)))))))
(: %poll (-> undefined))
(define (%poll)
(if (eq? (proc-status pid-poller-proc) 'exn)
(begin
(print-error-message (proc-return pid-poller-proc))
(fatal "pid poller exited!"))
(%poll-fds)))
;; join/value waits for a coroutine to exit,
;; then yields its return value
;;
;; in other words,
;; (join/value (spawn proc args ...))
;; is more-or-less semantically equivalent to
;; (proc args ...)
;; except for that the former evaluates (proc args ...)
;; in a different dynamic extent than the caller
;;
;; note that if the coroutine threw an exception,
;; the return value will satisfy 'condition?'
(: join/value ((vector fixnum * list) -> *))
(define (join/value proc)
(if (eq? (proc-status proc) 'started)
(call/cc
(lambda (ret)
(vector-set! proc 2 (cons ret (vector-ref proc 2)))
(%yield)))
(proc-return proc)))
;; XXX maybe we should be doing this lazily?
(define pid-poller-proc
(let* ((handle (foreign-lambda int sigchld_handler))
(err (handle)))
(if (<= err 0)
(error "error registering SIGCHLD handler:" (- err))
(spawn %pid-poll err))))
;; push-exception-wrapper installs ((current-exception-handler) (wrap exn))
;; as the current exception handler for the dynamic extent
;; of (thunk)
(define (push-exception-wrapper wrap thunk)
(let* ((old (current-exception-handler))
(new (lambda (exn)
(parameterize ((current-exception-handler old))
(old (wrap exn))))))
(parameterize ((current-exception-handler new))
(thunk))))
(define (with-cleanup done thunk)
(push-exception-wrapper
(lambda (exn) (done) exn)
(lambda ()
(let ((res (thunk)))
(done)
res))))
(define make-keyed-lock make-hash-table)
;; lock-key! locks a key associated
;; with a keyed lock; any other calls
;; to lock-key! in other continuations
;; will block at lock-key! until the first
;; caller calls unlock-key!, at which point
;; another caller will execute, and so forth
(: lock-key! (* * -> undefined))
(define (lock-key! lock key)
(let ((res (hash-table-ref/default lock key #f)))
(if res
(let ((num (car res))
(q (cdr res)))
(unless (= num 0)
(set-car! res (+ num 1))
(queue-wait! q)))
(hash-table-set! lock key (cons 1 (empty-queue))))))
;; unlock-key! unlocks a key associated
;; with a keyed lock
;; (see also: lock-key!)
(: unlock-key! (* * -> undefined))
(define (unlock-key! lock key)
(let* ((res (hash-table-ref/default lock key #f))
(q (cdr res))
(num (car res)))
(unless (> num 0)
(error "mis-matched lock/unlock of key"))
(queue-signal! q)
(set-car! res (- num 1))))
(: with-locked-key! (* * procedure -> *))
(define (with-locked-key lock key thunk)
(lock-key! lock key)
(let ((ret (thunk)))
(unlock-key! lock key)
ret))