-
Notifications
You must be signed in to change notification settings - Fork 1
/
distBase.oz
246 lines (238 loc) · 4.74 KB
/
distBase.oz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
functor
import
Pickle
Error
Open
OS
export
LocalProcess
Services
Load
BaseService
p2p:P2P
define
Services={Dictionary.new}
Incoming={Dictionary.new}
Outgoing={Dictionary.new}
class LocalProcess
attr
pid
addr
server
meth init(Host<=unit) Port in
pid := {NewName}
Incoming.@pid := {Dictionary.new}
Outgoing.@pid := {New Loopback init(self)}
server := {New TcpServer init(self Port)}
addr := if Host==unit then
a({OS.getHostByName "localhost"}.addrList.1 Port)
else
a(Host Port)
end
end
meth getProcess($)
p(pid:@pid addr:@addr)
end
meth newService(Init Listener $)
{New Services.{Label Init} preInit(self true Init Listener)}
end
meth serviceFromRef(SRef Listener $)
{New Services.{Label SRef} preInit(self false SRef Listener)}
end
end
class BaseService
attr
kind
this
thisP
up
sid
meth preInit(LocalP IsNew Init Listener)
this := LocalP
thisP := {LocalP getProcess($)}
kind := {Label Init}
up := Listener
if IsNew then
sid := {NewName}
{self {Record.adjoin Init init}}
else
sid := Init.sid
{self Init.p}
end
end
meth init(...) skip end
meth wrap($ ...)=Mapping
proc{$ Msg}
{self {Record.adjoin Msg Mapping.{Label Msg}}}
end
end
meth getRef($)
L=@kind in
L(sid:@sid p:{self getRefParams($)})
end
meth getRefParams($) init() end
end
class P2P
from BaseService
end
proc{DropMsg _} skip end
proc{ProcessMessage ToId MsgP}
if MsgP\=nil then
Msg={Pickle.unpack MsgP} in
case Msg
of flp2p(Sid Origin Content) then
thread
{{Dictionary.condGet Incoming.ToId Sid DropMsg} deliver(Origin Content)}
end
end
end
end
class FLP2P
from P2P
meth init()
Incoming.(@thisP.pid).@sid := @up
end
meth send(To Msg)
if {OS.rand} mod 1000 <100 then {self send(To Msg)} end %Dup rate
if {OS.rand} mod 1000 <900 then %No-drop rate
thread
{Delay {OS.rand} mod 200} %Better distribution needed
{{GetConnection To} send({Pickle.pack flp2p(@sid @thisP Msg)})}
end
{Delay 100} %Delay by average delay to avoid thread explosion
end
end
end
Services.flp2p := FLP2P
fun{GetConnection To}
case {Dictionary.condGet Outgoing To.pid unit}
of unit then N in
case {Dictionary.condExchange Outgoing To.pid unit $ N}
of unit then
N={New TcpClient init(To)}
N
[] C then N=C C
end
[] C then C
end
end
class Loopback
attr lpId
meth init(LocalP)
lpId := {LocalP getProcess($)}.pid
end
meth send(Msg)
{ProcessMessage @lpId Msg}
end
end
fun{FromHex Cs}
case Cs of nil then nil
[] A|B|T then (A-&a)*16+(B-&a)|{FromHex T}
end
end
class TcpServer
attr
lpId
s
meth init(LocalP P)
proc{Loop Xs} M Mr in
{List.takeDropWhile Xs fun{$ C}C\=&X end M Mr}
case Mr
of nil then skip
else
try
{ProcessMessage @lpId {FromHex M}}
catch E then
{Error.printException E}
end
{Loop Mr.2}
end
end
in
lpId := {LocalP getProcess($)}.pid
s := {New Open.socket init()}
{@s bind(port:P)}
{@s listen()}
thread
for S from fun{$}{@s accept(accepted:$ acceptClass:Open.socket)}end do
thread
{Loop thread {S read(list:$ size:all)} end}
{S close()}
end
end
end
end
end
fun{ToHex Cs}
case Cs of nil then nil
[] X|T then (X div 16 + &a)|(X mod 16 + &a)|{ToHex T}
end
end
class TcpClient
prop locking
attr
rp
s
ok
unused
meth init(RemoteP)
unused:=false
@rp = RemoteP
try
@s = {New Open.socket client(host:RemoteP.addr.1 port:RemoteP.addr.2)}
@ok=true
thread {self check} end
catch system(os(os "connect" ...) ...) then
@ok=false
{self destroy()}
[] E then
{Error.printException E}
@ok=false
{self destroy()}
end
end
meth send(Msg)
lock
if @ok then
unused:=false
try
{@s write(vs:{ToHex {VirtualString.toString Msg}})}
{@s write(vs:"X")}
catch system(os(os "write" ...) ...) then
ok:=false
{self destroy}
[] E then
{Error.printException E}
ok:=false
{self destroy}
end
end
end
end
meth destroy()
false=@ok
{@s close()}
thread
{Delay 1000}
{Dictionary.remove Outgoing @rp.pid}
end
end
meth check()
{Delay 5000}
lock
if @ok then
if @unused then
ok:=false
{self destroy}
else
unused:=true
end
end
end
if @ok then
{self check}
end
end
end
proc{Load}skip end
end