-
Notifications
You must be signed in to change notification settings - Fork 1
/
distExtra.oz
551 lines (539 loc) · 12.1 KB
/
distExtra.oz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
functor
import
DistBase at 'distBase.ozf'
export
Load
define
Services=DistBase.services
class SP2P
from DistBase.p2p
attr
flp2p
sent
meth init(flp2p:Ref<=unit)
if Ref==unit then
@flp2p={@this newService(flp2p() {self wrap(deliver:Deliver $)} $)}
else
@flp2p={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@sent=nil
thread {Delay 1000} {self Timeout()} end
end
meth Timeout()
for Dest#M in @sent do
{@flp2p send(Dest M)}
end
{Delay 1000}
{self Timeout()}
end
meth send(Dest Msg)
OldSent in
{@flp2p send(Dest Msg)}
OldSent=sent:=(Dest#Msg)|OldSent
end
meth Deliver(Src Msg)
{@up deliver(Src Msg)}
end
meth getRefParams($)
init(flp2p:{@flp2p getRef($)})
end
end
Services.sp2p:=SP2P
class BookPP2P
from DistBase.p2p
attr
sp2p
delivered
meth init(sp2p:Ref<=unit)
if Ref==unit then
@sp2p={@this newService(sp2p() {self wrap(deliver:Deliver $)} $)}
else
@sp2p={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@delivered={Dictionary.new}
end
meth send(Dest Msg)
{@sp2p send(Dest m({NewName} Msg))}
end
meth Deliver(Src Msg)
case Msg
of m(Mid Content) then
if {Not {Dictionary.condExchange @delivered Mid false $ true}} then
{@up deliver(Content)}
end
end
end
meth getRefParams($)
init(sp2p:{@sp2p getRef($)})
end
end
Services.book_pp2p:=BookPP2P
fun{Lowest Def Dic}
{List.foldL {Dictionary.keys Dic} Min Def}
end
class SmarterPP2P
from DistBase.p2p
attr
down
partners
meth init(down:Ref<=unit)
if Ref==unit then
@down={@this newService(flp2p() {self wrap(deliver:Deliver $)} $)}
else
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@partners={Dictionary.new}
thread {Delay 1000} {self Timeout()} end
end
meth Timeout()
for P in {Dictionary.items @partners} do
for M in {Dictionary.items P.sending} do
{@down M}
end
for M in {Dictionary.items P.ackSending} do
{@down M}
end
end
{Delay 1000}
{self Timeout()}
end
meth Partner(P $)
case {Dictionary.condGet @partners P.pid unit}
of unit then N in
case {Dictionary.condExchange @partners P.pid unit $ N}
of unit then
N=p(nextSendId:{NewCell 0}
lastSent:{NewCell 0}
sending:{Dictionary.new}
acceptableId:{NewCell 0}
delivered:{Dictionary.new}
ackSending:{Dictionary.new})
N
[] R then N=R R
end
[] R then R
end
end
meth send(Dest Msg)
P={self Partner(Dest $)}
ThisId NextId
OldLast NewLast
in
ThisId=(P.nextSendId):=NextId
NextId=ThisId+1
P.sending.ThisId:=send(Dest msg(ThisId Msg))
OldLast=(P.lastSent):=NewLast
NewLast={Max OldLast ThisId}
{@down send(Dest msg(ThisId Msg))}
end
meth Deliver(Src Msg)
P={self Partner(Src $)}
in
case Msg
of msg(Mid Cont) then
if Mid>=@(P.acceptableId) andthen
{Not {Dictionary.condExchange P.delivered Mid false $ true}} then
{@up deliver(Src Cont)}
P.ackSending.Mid:=send(Src ack(Mid))
end
{@down send(Src ack(Mid))}
[] ack(Mid) then
{Dictionary.remove P.sending Mid}
{@down send(Src ackack(Mid {Lowest @(P.lastSent) P.sending}))}
[] ackack(Mid Nid) then
OldAccId NewAccId in
{Dictionary.remove P.ackSending Mid}
OldAccId=(P.acceptableId):=NewAccId
NewAccId={Max OldAccId Nid}
for K in {Dictionary.keys P.delivered} do
if K<NewAccId then
{Dictionary.remove P.delivered K}
end
end
end
end
meth getRefParams($)
init(down:{@down getRef($)})
end
end
Services.pp2p:=SmarterPP2P
class EPFD
from DistBase.baseService
attr
down
all
alive
suspected
period
newPeriod
meth init(down:Ref<=unit)
if Ref==unit then
@down={@this newService(pp2p() {self wrap(deliver:Deliver $)} $)}
else
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@all={Dictionary.new}
@alive={Dictionary.new}
@suspected={Dictionary.new}
@period=@newPeriod=3000
thread {Delay @period} {self Timeout()} end
end
meth Timeout()
for X in {Dictionary.keys @all} do
if {Not {HasFeature @suspected X}} andthen
{Not {HasFeature @alive X}} then
suspected.X:=true
{@up suspect(all.X)}
elseif {HasFeature @suspected X} andthen
{HasFeature @alive X} then
newPeriod:=@period+500
{Dictionary.remove @suspected X}
{@up restore(all.X)}
end
{@down send(all.X unit)}
end
{Dictionary.removeAll @alive}
period:=@newPeriod
{Delay @period}
{self Timeout()}
end
meth Deliver(Src _)
alive.(Src.pid):=true
end
meth monitor(Ps)
for P in Ps do
{@down send(P unit)}
end
thread
{Delay @period}
for P in Ps do
all.(P.pid):=P
end
end
end
meth getRefParams($)
init(down:{@down getRef($)})
end
end
Services.epfd:=EPFD
fun{Best Ps}
case Ps
of [A] then A
[] A|B|T then
PA=A.pid PB=B.pid in
if {Arity f(PA:a PB:b)}==[PA PB] then
{Best A|T}
else
{Best B|T}
end
end
end
class ELD
from
DistBase.baseService
attr
down
all
alive
current
meth init(Ps down:Ref<=unit)
if Ref==unit then
@down={@this newService(epfd() {self wrap(suspect:Suspect
restore:Restore $)} $)}
else
@down={@this serviceFromRef(Ref {self wrap(suspect:Suspect
restore:Restore $)} $)}
end
{@down monitor(Ps)}
@all={Dictionary.new}
@alive={Dictionary.new}
for P in Ps do
all.(P.pid):=P
alive.(P.pid):=P
end
if {Not {HasFeature @all @thisP.pid}} then
raise eld_localProcessNotIncluded end
end
@current={Best {Dictionary.items @alive}}
{@up trust(@current)}
end
meth Suspect(P)
NewCur OldCur in
{Dictionary.remove @alive P.pid}
OldCur=current:=NewCur
NewCur={Best {Dictionary.items @alive}}
if NewCur\=OldCur then
{@up trust(NewCur)}
end
end
meth Restore(P)
NewCur OldCur in
alive.(P.pid):=P
OldCur=current:=NewCur
NewCur={Best {Dictionary.items @alive}}
if NewCur\=OldCur then
{@up trust(NewCur)}
end
end
meth getRefParams($)
init({Dictionary.items @all} down:{@down getRef($)})
end
end
Services.eld:=ELD
class BCast from DistBase.baseService end
class BEB
from BCast
attr
down
all
meth init(Ps down:Ref<=unit)
if Ref==unit then
@down={@this newService(pp2p() {self wrap(deliver:Deliver $)} $)}
else
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@all=Ps
end
meth broadcast(Msg)
for P in @all do
{@down send(P Msg)}
end
end
meth Deliver(Src Msg)
{@up deliver(Src Msg)}
end
meth getRefParams($)
init(@all down:{@down getRef($)})
end
end
Services.beb:=BEB
class RB
from BCast
attr
down
delivered
meth init(Ps<=nil down:Ref<=unit)
if Ref==unit then
if {Not {Member @thisP Ps}} then
raise rb_localProcessNotIncluded end
end
@down={@this newService(beb(Ps) {self wrap(deliver:Deliver $)} $)}
else
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
end
@delivered={Dictionary.new}
end
meth broadcast(Msg)
Mid={NewName} in
@delivered.Mid:=true
{@up deliver(@thisP Msg)}
{@down broadcast(m(Mid @thisP Msg))}
end
meth Deliver(_ Msg)
case Msg
of m(Mid Src Content) then
if {Not {Dictionary.condExchange @delivered Mid false $ true}} then
{@up deliver(Src Content)}
{@down broadcast(Msg)}
end
end
end
meth getRefParams($)
init(down:{@down getRef($)})
end
end
Services.rb:=RB
proc{AddAck D K F}
DK=
case {Dictionary.condGet D K unit}
of unit then N in
case {Dictionary.condExchange D K unit $ N}
of unit then T in
T={Dictionary.new}
T.count:=0
N=T
[] R then N=R R
end
[] R then R
end
in
if {Not {Dictionary.condExchange DK F false $ true}} then
O N in
O=DK.count:=N
N=O+1
end
end
class URB
from BCast
attr
down
quorum
delivered
pending
ack
meth init(Ps<=nil down:Ref<=unit quorum:Q<=unit)
if Ref==unit then
if {Not {Member @thisP Ps}} then
raise urb_localProcessNotIncluded end
end
@down={@this newService(beb(Ps) {self wrap(deliver:Deliver $)} $)}
@quorum=({Length Ps}+1) div 2
else
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
@quorum=Q
end
@delivered={Dictionary.new}
@pending={Dictionary.new}
@ack={Dictionary.new}
end
meth broadcast(Msg)
Mid={NewName} in
@pending.Mid:=deliver(@thisP Msg)
{@down broadcast(m(Mid @thisP Msg))}
end
meth Deliver(From Msg)
case Msg
of m(Mid Src Content) then
{AddAck @ack Mid Src.pid}
case {Dictionary.condExchange @pending Mid unit $ deliver(Src Content)}
of unit then
{@down broadcast(Msg)}
else skip
end
if ack.Mid.count>=@quorum andthen
{Not {Dictionary.condExchange delivered Mid false $ true}} then
{@up pending.Mid}
end
end
end
meth getRefParams($)
init(down:{@down getRef($)} quorum:@quorum)
end
end
Services.urb:=URB
class RCO
from BCast
prop locking
attr
down
vc
pending
meth init(Ps<=unit down:Ref<=unit)
if Ref\=unit then
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
elseif Ps\=unit then
@down={@this newService(rb(Ps) {self wrap(deliver:Deliver $)} $)}
else
raise rco_hostsListMissing end
end
@vc={Dictionary.new}
@pending={Dictionary.new}
end
meth Bump(P)
N in
N={Dictionary.condExchange @vc P.pid 0 $ N}+1
end
meth broadcast(Msg)
lock
{@up deliver(@thisP Msg)}
{@down broadcast(m({Dictionary.entries @vc} Msg))}
{self Bump(@thisP)}
end
end
meth Deliver(Src Msg)
lock
case Msg
of m(VC Content) then
if Src\=@thisP then
Redo={NewCell false}
proc{DeliverPending}
Redo:=false
for K#p(Src VC Content) in {Dictionary.entries @pending} do
if {List.all VC fun{$ P#V}
{Dictionary.condGet @vc P 0}>=V
end} then
{Dictionary.remove @pending K}
{@up deliver(Src Content)}
{self Bump(Src)}
Redo:=true
end
end
if @Redo then {DeliverPending} end
end in
@pending.{NewName}:=p(Src VC Content)
{DeliverPending}
end
end
end
end
meth getRefParams($)
init(down:{@down getRef($)})
end
end
Services.rco:=RCO
class UCO
from BCast
prop locking
attr
down
vc
pending
del
meth init(Ps<=unit down:Ref<=unit)
if Ref\=unit then
@down={@this serviceFromRef(Ref {self wrap(deliver:Deliver $)} $)}
elseif Ps\=unit then
@down={@this newService(urb(Ps) {self wrap(deliver:Deliver $)} $)}
else
raise uco_hostsListMissing end
end
@vc={Dictionary.new}
@pending={Dictionary.new}
@del=0
end
meth Bump(P)
N in
N={Dictionary.condExchange @vc P.pid 0 $ N}+1
end
meth broadcast(Msg)
lock
{@down broadcast(m({Dictionary.entries @vc} Msg))}
{self Bump(@thisP)}
end
end
meth Deliver(Src Msg)
lock
case Msg
of m(VC Content) then
Redo={NewCell false}
proc{DeliverPending}
Redo:=false
for K#p(Src VC Content) in {Dictionary.entries @pending} do
if {List.all VC fun{$ P#V}
{Dictionary.condGet @vc P 0}>=V andthen
(P\=@thisP orelse @del>=V)
end} then
{Dictionary.remove @pending K}
{@up deliver(Src Content)}
if Src\=@thisP then
{self Bump(Src)}
else N in
N=(del:=N)+1
end
Redo:=true
end
end
if @Redo then {DeliverPending} end
end in
@pending.{NewName}:=p(Src VC Content)
{DeliverPending}
end
end
end
meth getRefParams($)
init(down:{@down getRef($)})
end
end
Services.uco:=UCO
proc{Load}skip end
end