-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtunnel.go
2319 lines (2122 loc) · 69.1 KB
/
tunnel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"encoding/binary"
"io"
"log"
"net"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/aristanetworks/goarista/dscp"
"github.com/klauspost/reedsolomon"
"github.com/tevino/abool"
"github.com/urfave/cli"
)
// The Conn interface of choice
type Conn *net.TCPConn
// The Shot is each connection including the time point of execution, the payload and
// its (optional) its length
type Shot struct {
client []byte // int64 for TCP / 21bytes string (ipv4:port) for UDP
ofs []byte // uint32
seq []byte // uint32
payload []byte
ln uint32
}
// Fec is struct for forward error correction options
type Fec struct {
ds int // data shards
ps int // parity shards
ln int // data + parity
}
// Payload read from client or service
type Payload struct {
data []byte
ln int
}
// Frags are fragment sizes
type Frags struct {
payload int
mtu int
tos byte
bt time.Duration // buffer times in ms
bs int // buffer size in number of payloads
after int64 // start buffering only after
avlc *int // private counter for locally available connections
}
// ClientCmd shots to handle communication between tunnel ends
type ClientCmd struct {
cmd []byte // bool
client []byte // int64 for TCP / 21bytes string (ipv4:port) for UDP
data []byte // depending on cmd
}
// Client holds the id of the client that is unix nano and its connection
type Client struct {
end byte // 0 for client, 1 for server
client []byte // int64 for TCP
conn interface{}
seq chan uint32 // current/next data seq
}
func main() {
var prctl, frg, conns, lassoes, listen, lFling, rFling, forward, buffer, after,
lLasso, rLasso, lFlingR, rFlingR, lSync, rSync, retries, to, ti, fec, dup string
var rmtx, lmtx sync.Mutex
// var rmtx sync.Mutex
app := cli.NewApp()
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "fec",
Value: "0:0",
Usage: "forward error correction flag [parity:error] (0)",
Destination: &fec,
},
cli.StringFlag{
Name: "dup",
Value: "fling",
Usage: "enable duplex mode, [fling,lasso,both,none] (both)",
Destination: &dup,
},
cli.StringFlag{
Name: "protocol",
Value: "tcp",
Usage: "protocol used by the client and server to be tunnelled over tcs (tcp)",
Destination: &prctl,
},
cli.StringFlag{
Name: "buffer",
Value: "250:0",
Usage: "forward rate (ms) and how many payloads to buffer between forwards (0:0)",
Destination: &buffer,
},
cli.StringFlag{
Name: "after",
Value: "2000",
Usage: "for handshakes, start buffering only after (0)ms",
Destination: &after,
},
cli.StringFlag{
Name: "frags",
Value: "10000:1250",
Usage: "size of the payload for each connection in bytes and mtu (10000:1308)",
Destination: &frg,
},
cli.StringFlag{
Name: "tick",
Value: "50",
Usage: "unit of time to read a raw payload in milliseconds (50)",
Destination: &ti,
},
cli.StringFlag{
Name: "tock",
Value: "250",
Usage: "timeout for merging raw payloads in milliseconds (250)",
Destination: &to,
},
cli.StringFlag{
Name: "conns",
Value: "4",
Usage: "the number of simultaneous connections for flinging (4)",
Destination: &conns,
},
cli.StringFlag{
Name: "lassoes",
Value: "0",
Usage: "the number of simultaneous connections for lassoes (0)",
Destination: &lassoes,
},
cli.StringFlag{
Name: "retries",
Value: "0:4",
Usage: "enable synced retries and tune skipping rate (0:conns)",
Destination: &retries,
},
cli.StringFlag{
Name: "listen",
Value: "127.0.0.1:6000",
Usage: "address for clients connections to be tunneled (127.0.0.1:6000)",
Destination: &listen,
},
cli.StringFlag{
Name: "lFling",
Value: "127.0.0.1:6090",
Usage: "local listening address for peers (127.0.0.1:6090)",
Destination: &lFling,
},
cli.StringFlag{
Name: "rFling",
Value: "127.0.0.1:6091",
Usage: "address to send outgoing connections, a lFling of another peer (127.0.0.1:6091)",
Destination: &rFling,
},
cli.StringFlag{
Name: "lLasso",
Value: "0",
Usage: "address to listen to incoming lasso connections (0)",
Destination: &lLasso,
},
cli.StringFlag{
Name: "rLasso",
Value: "0",
Usage: "remote address to send lassos to (0)",
Destination: &rLasso,
},
cli.StringFlag{
Name: "lFlingR",
Value: "0",
Usage: "address to listen to incoming retry flings (0)",
Destination: &lFlingR,
},
cli.StringFlag{
Name: "rFlingR",
Value: "0",
Usage: "remote address to send retry flings to (0)",
Destination: &rFlingR,
},
cli.StringFlag{
Name: "forward",
Value: "127.0.0.1:6003",
Usage: "address of the server to be tunneled (127.0.0.1:6003)",
Destination: &forward,
},
cli.StringFlag{
Name: "lSync",
Value: "127.0.0.1:5999",
Usage: "address for listening to status syncronizations (127.0.0.1:5999)",
Destination: &lSync,
},
cli.StringFlag{
Name: "rSync",
Value: "127.0.0.1:5998",
Usage: "remote peer address for status synchronizations (127.0.0.1:5998)",
Destination: &rSync,
},
}
app.Action = func(c *cli.Context) error {
// flags conversions
// connections
conns := intString(conns)
lassoes := intString(lassoes)
// tick tock
tid := toTimeMs(ti)
tod := toTimeMs(to)
// fec
fecConf := Fec{}
if fec != "0" {
vals := coupleIntString(fec)
fecConf = Fec{
ds: vals[0],
ps: vals[1],
ln: vals[0] + vals[1],
}
}
// frags
frags := coupleIntString(frg)
bufOpt := coupleIntString(buffer)
avlc := 0
frg := Frags{
payload: frags[0],
mtu: frags[1],
tos: byte(46),
bt: toTimeMs(bufOpt[0]),
bs: bufOpt[1],
after: int64(toTimeMs(after)),
avlc: &avlc,
}
// retries
retries := coupleIntString(retries)
rtr := !(retries[0] == 0)
skip := retries[1]
// multi
m := 100
// mc := oneAtLeast(m * conns)
// ml := oneAtLeast(m * lassoes)
mc := m * conns
ml := m * lassoes
// general errors channel
errchan := make(chan error)
// channel for flinging
fchan := make(chan []byte, mc)
// channel for received flinged shots from remote
rfchan := make(chan *Shot, mc)
// channel for ticker
tichan := makeClock()
// channel for tocker
tochan := makeClock()
// channel for shots to be caught by lassos
pachan := make(chan *Shot, ml)
// channel for raw service->client shots (dup mode)
padchan := make(chan []byte, mc)
// channel for shots caught from lassos
cachan := make(chan *Shot, ml)
// channel for raw ClientCmd shots
padRchan := make(chan []byte, 2)
// holds direct client commands (like connection sync/updates)
clcmdchan := make(chan *ClientCmd, 2*3)
// holds reverse client commands
crchan := make(chan *ClientCmd, 2*3)
// conn queue channel for flings
cq := make(chan Conn, conns)
// new connections requests for conn queue channel (channel length to avoid write blocking)
newcC := make(chan bool, mc)
// channel for issueing flushes of the retries channel
schan := make(chan bool, mc)
// new connections queue for lassoes
newcL := make(chan bool, ml)
// new connections queue for reverse lassoes
newcR := make(chan bool, 2*3)
switch prctl {
case "tcp":
// channels for local connections management
addchan := make(chan *Client)
rmchan := make(chan interface{})
// holds clients connections buffered payloads for retries
rtmap := make(map[int64]chan *ShotRtr)
// holds clients connections ids
clients := make(map[int64]bool)
// holds clients connections payloads
frmap := make(map[int64]map[uint32]*Shot)
// holds lasso connections payloads
flmap := make(map[int64]map[uint32]*Shot)
// holds channels for forward throttling of each client
fwmap := make(map[int64][2]chan bool)
// holds clients connections objects
ccon := make(map[int64]Conn)
// current offset map for forwarding remote/local
rOfsMap := make(map[int64]uint32)
lOfsMap := make(map[int64]uint32)
go throttle(tichan, tid)
go throttle(tochan, tod)
go syncServer(errchan, &lSync, clcmdchan)
go clientServer(errchan, &listen, fchan, rtr, rtmap, schan, addchan, rmchan,
&frg, fecConf, conns, tichan, tochan, tid, tod)
go syncHandler(addchan, rmchan, &rSync, clients, frmap, flmap, clcmdchan, ccon,
&forward, pachan, padchan, padRchan, rtr, rtmap, &rmtx,
&frg, fecConf, tichan, tochan, tid, tod, fwmap,
conns, cq, newcC, schan, rOfsMap, lOfsMap)
switch {
case fecConf.ln != 0 && !rtr:
go dispatchFec(rfchan, ccon, &frg, fecConf, frmap, rOfsMap, &rmtx) // clientToServer dispatcher
go dispatchFec(cachan, ccon, &frg, fecConf, flmap, lOfsMap, &lmtx) // serverToClient dispatcher
case fecConf.ln == 0 && !rtr:
go dispatch(clients, rfchan, ccon, frmap, rOfsMap, fwmap, &rmtx, skip, &frg) // clientToServer dispatcher
go dispatch(clients, cachan, ccon, flmap, lOfsMap, fwmap, &lmtx, skip, &frg) // serverToClient dispatcher
case rtr:
go dispatchRtr(rfchan, padRchan, ccon, frmap, &rmtx) // clientToServer dispatcher
// the retry for reverse is not even implemented so channel is nil
go dispatchRtr(cachan, nil, ccon, flmap, &lmtx) // serverToClient dispatcher
}
case "udp":
// channels for local connections management
addchan := make(chan *ClientUDP)
rmchan := make(chan interface{})
// holds clients connections buffered payloads for retries
rtmap := make(map[string]chan *ShotRtr)
// holds clients connections ids
clients := make(map[string]*net.UDPAddr)
// holds clients connections payloads
frmap := make(map[string]map[uint32]*Shot)
// holds lasso connections payloads
flmap := make(map[string]map[uint32]*Shot)
// holds channels for forward throttling of each client
fwmap := make(map[string][2]chan bool)
// holds clients connections objects, in udp only for server side
ccon := make(map[string]*net.UDPConn)
// current offset map for forwarding remote/local
rOfsMap := make(map[string]uint32)
lOfsMap := make(map[string]uint32)
// channel for the single connection of the udp listener
uchan := make(chan *net.UDPConn)
go syncServerUDP(errchan, &lSync, clients, clcmdchan)
go clientServerUDP(errchan, &listen, fchan,
rtr, rtmap, schan,
addchan, rmchan, clients, uchan,
fecConf, conns, &frg,
tichan, tochan, tid, tod)
go syncHandlerUDP(addchan, rmchan, &rSync, clients, frmap, flmap, clcmdchan, ccon,
&forward, pachan, padchan, padRchan, rtr, rtmap, fwmap,
&rmtx, &frg, fecConf, tichan, tochan, tid, tod,
conns, cq, newcC, schan, rOfsMap, lOfsMap)
if !rtr {
go dispatchUDPServer(rfchan, crchan, ccon, frmap, rOfsMap, skip, &rmtx) // clientToServer dispatcher
go dispatchUDPClient(uchan, cachan, clcmdchan, clients, flmap, lOfsMap, skip, &lmtx) // serverToClient dispatcher
} else {
go dispatchUDPServerRtr(rfchan, padchan, ccon, frmap, &rmtx, skip) // clientToServer dispatcher
go dispatchUDPClientRtr(uchan, cachan, padRchan, clients, flmap, &lmtx, skip) // serverToClient dispatcher
}
}
if rLasso != "0" {
go lassoer(&rLasso, cachan, fchan, &frg, lassoes, newcL, tid, &dup, &prctl)
}
if lLasso != "0" {
go lassoServer(errchan, &lLasso, padchan, rfchan, &frg, tid, &dup)
}
if lFling != "0" {
go flingServer(errchan, &lFling, rfchan, padchan, &frg, conns, tid, &dup, &prctl)
}
if rFling != "0" {
go flinger(fchan, &rFling, cq, newcC, conns, cachan, &frg, tid, tod, &dup, &prctl)
}
if lFlingR != "0" {
go flingRServer(errchan, &lFlingR, padRchan)
}
if rFlingR != "0" {
go flingerR(&rFlingR, clcmdchan, &frg, lassoes, newcR, tid)
}
return <-errchan
}
e := app.Run(os.Args)
log.Printf("tcs terminated, error: %v", e)
}
// listens to connections from client to be tunneled
func clientServer(errchan chan<- error, addr *string, fchan chan<- []byte,
retries bool, rtmap map[int64]chan *ShotRtr, schan chan<- bool,
addchan chan<- *Client, rmchan chan<- interface{}, frg *Frags, fec Fec, conns int,
tichan [2]chan bool, tochan [2]chan bool, tid time.Duration, tod time.Duration) {
addrTCP, _ := net.ResolveTCPAddr("tcp", *addr)
// ln, err := net.ListenTCP("tcp", addrTCP)
ln, err := dscp.ListenTCPWithTOS(addrTCP, frg.tos)
if err != nil {
errchan <- err
}
for {
pcchan := make(chan Payload, 3*conns) // the payloads channel for each connection
conn, err := ln.AcceptTCP()
if err != nil {
log.Println(err)
continue
}
go tunnelPayloadsReader(pcchan, conn, frg, fec, tid, tod)
go handleClientToTunnel(conn, fchan, retries, rtmap, addchan, rmchan,
schan, pcchan, frg, tichan, tochan, tid, tod)
}
}
// listens to connections from peers for data traffic
func flingServer(errchan chan<- error, addr *string, rfchan chan<- *Shot, padchan chan []byte,
frg *Frags, conns int, tid time.Duration, dup *string, prctl *string) {
// the channel to avoid conn saturation
fschan := make(chan bool, conns)
// a dumb byte array for late writes
stuff := make([]byte, 10)
addrTCP, _ := net.ResolveTCPAddr("tcp", *addr)
// ln, err := net.ListenTCP("tcp", addrTCP)
ln, err := dscp.ListenTCPWithTOS(addrTCP, frg.tos)
if err != nil {
errchan <- err
}
for {
conn, err := ln.AcceptTCP()
if err != nil {
log.Println(err)
continue
}
*(frg.avlc)++
// log.Printf("handling a received fling, %v", *(frg.avlc))
go handleTunnelToTunnel(prctl, conn, rfchan, padchan, frg, conns,
fschan, tid, defDup(dup), stuff)
}
}
func defDup(dup *string) bool {
switch *dup {
case "both", "lasso", "fling":
return true
default:
return false
}
}
// listens to lasso connections from clients for reverse data traffic
func lassoServer(errchan chan<- error, addr *string, padchan chan []byte,
rfchan chan *Shot, frg *Frags, tid time.Duration, dup *string) {
addrTCP, _ := net.ResolveTCPAddr("tcp", *addr)
// ln, err := net.ListenTCP("tcp", addrTCP)
ln, err := dscp.ListenTCPWithTOS(addrTCP, frg.tos)
if err != nil {
errchan <- err
}
var bdup bool
switch *dup {
case "both", "lasso":
bdup = true
default:
bdup = false
}
for {
conn, err := ln.AcceptTCP()
if err != nil {
log.Println(err)
continue
}
// put the connection in the lassos channel
go handleLasso(padchan, conn, rfchan, frg, tid, bdup)
}
}
// listens to sync commands requests
func syncServer(errchan chan<- error, addr *string, cchan chan<- *ClientCmd) {
addrTCP, _ := net.ResolveTCPAddr("tcp", *addr)
tos := byte(46)
ln, err := dscp.ListenTCPWithTOS(addrTCP, tos)
if err != nil {
errchan <- err
}
for {
conn, err := ln.AcceptTCP()
if err != nil {
log.Println(err)
continue
}
go handleSyncConnection(conn, cchan)
}
}
func dialTCP(addr *net.TCPAddr) Conn {
c, err := dscp.DialTCPWithTOS(nil, addr, 46)
if err != nil {
log.Printf("failed dialing connection: %v", err)
time.Sleep(time.Second)
return nil
}
return c
}
func rampConns(addrTCP *net.TCPAddr, st int, cq chan<- Conn, t time.Duration) {
cn := 0
time.Sleep(t)
for cn < st {
go func(chan<- Conn) {
// c, err := net.DialTCP("tcp", nil, addrTCP)
cq <- dialTCP(addrTCP)
}(cq)
cn++
}
}
// queue up connections
func connQueue(addr *string, cq chan<- Conn, newc <-chan bool, st int, start time.Duration, avlc *int) {
addrTCP, _ := net.ResolveTCPAddr("tcp", *addr)
// start with just one connection
cq <- dialTCP(addrTCP)
*avlc++
// start the rest of the connections after startup
go rampConns(addrTCP, st, cq, start)
for range newc {
// log.Printf("go new conn to %v", *addr)
go func() {
// c, err := net.DialTCP("tcp", nil, addrTCP)
// time.Sleep(100 * time.Duration(len(cq)) * time.Millisecond)
cq <- dialTCP(addrTCP)
*avlc++ // a new conn is avl
// log.Printf("increased avlc %v", *avlc)
}()
}
}
func getConn(cq chan Conn, addr *net.TCPAddr, newc chan<- bool) (c Conn) {
select {
case c = <-cq:
default:
// newc <- true // since no conn is avail queue a new one and dial on another
for e := *new(error); c == nil || e != nil; c, e = dscp.DialTCPWithTOS(nil, addr, 46) {
if c != nil {
log.Printf("getConn, dial error %v", e)
}
}
}
return
}
// sends shots to flingServer
func flinger(fchan chan []byte, rFling *string, cq chan Conn, newc chan bool,
st int, cachan chan<- *Shot, frg *Frags, tid time.Duration, tod time.Duration, dup *string, prctl *string) {
addrTCP, _ := net.ResolveTCPAddr("tcp", *rFling)
// set up connections pool
go connQueue(rFling, cq, newc, st, time.Duration(frg.after), frg.avlc)
stuff := make([]byte, 10)
flingAndLasso := func(f []byte, c Conn, rw *abool.AtomicBool,
newcf chan<- bool, newcl chan<- bool) {
go fling(f, fchan, c, newcf, frg, true, rw, stuff, nil, nil)
go lasso(prctl, cachan, c, newcl, frg, tid, true, rw, nil)
}
burst := false // not burst for now
// start flinging
if defDup(dup) {
for {
rw := abool.New()
if burst { // in burst mode the payloads can request more connections
select { // either there are shots to fling to we initiate both the fling and lasso
case f := <-fchan:
// shoot and throw force getting a conn
log.Printf("1")
go flingAndLasso(f, getConn(cq, addrTCP, newc), rw, newc, nil)
default: // or there are no flings so we dumb shoot
select {
case c := <-cq: // get a conn but don't stall for connections in case a shot to fling is queued
// shoot and throw with dumb write and the recently fetched conn
log.Printf("2")
// if there are no shots to fling the lasso queues new conns
go flingAndLasso(stuff, c, rw, nil, newc)
case f := <-fchan: // we got a fling before we could get a conn, so brute get a conn and shoot/throw
log.Printf("3")
go flingAndLasso(f, getConn(cq, addrTCP, newc), rw, newc, nil)
}
}
} else { // in non burst mode the connection pool is always fixed
tochan := makeClock()
go throttle(tochan, tod)
for c := range cq {
rw := abool.New()
pubsub := make(chan bool)
// only the fling requests new conn
// if lasso reads before a fling, signals the fling to give up the conn (dumbwrite)
go fling(nil, fchan, c, newc, frg, true, rw, stuff, pubsub, tochan[0])
// if the fling shoots before a lasso is read, the lasso wait for the remote hand
go lasso(prctl, cachan, c, nil, frg, tid, true, rw, pubsub)
}
}
}
} else {
rw := abool.NewBool(true)
stuff := make([]byte, 10)
for f := range fchan {
go func(f []byte) {
c := getConn(cq, addrTCP, newc)
go fling(f, fchan, c, newc, frg, false, rw, stuff, nil, nil)
}(f)
}
}
// dynamic mode
// newc <- true
// for f := range fchan {
// go fling(f, fchan, <-cq, newc, frg, false)
// newc <- true
// }
}
// write the fling to the connection and make sure it is received
// func fling(fchan chan []byte, c Conn, newc chan<- bool, frg *Frags dup bool) {
func fling(dst []byte, fchan chan []byte, c Conn, newc chan<- bool, frg *Frags, dup bool,
rw *abool.AtomicBool, stuff []byte, pubsub chan bool, tochan <-chan bool) {
// log.Printf("got a connection for flinging")
// case dst = <-fchan // get the dst and continue
// case <-time.After(1000 * time.Millisecond): // timeout, dumb write and close
// log.Printf("no shots for now...closing (clt)")
// c.Write(stuff) // dumb write
// if rw.SetToIf(false, true) { // throw is not done, partial close
// wClose(c, newc)
// } else { // throw done, write close
// cClose(c, newc)
// }
// return
// }
if pubsub != nil { // non burst mode case, wait for a payload or give up if signaled
select {
case dst = <-fchan: // in non burst mode connections have precedence, which means dst is not prefetched (waiting from the channel)
n, e := c.Write(dst)
if e != nil || n == 0 {
qShot(dst, fchan)
}
cwClose(c, rw) // close conn
newc <- true // req new conn
*(frg.avlc)--
// log.Printf("scaled down avlc %v", *(frg.avlc))
log.Printf("shot flung len: %v to %v", len(dst), c.LocalAddr())
return
case <-pubsub: // the lasso signaled to close connection
c.Write(stuff) // dumbwrite
cwClose(c, rw) // close conn
<-tochan // respect rate limiter
newc <- true // req new conn
*(frg.avlc)--
// log.Printf("scaled down avlc %v", *(frg.avlc))
// log.Printf("gave up fling")
return
}
}
cclo := func() { c.Close() } // full close
wclo := func() { (*c).CloseWrite() } // close only write
qwclo := func() { wClose(c, newc) } // close only write and queue conn
qcclo := func() { cClose(c, newc) } // full close and queue conn
//log.Printf("writing from %v to %v", t-mss, l)
n, e := c.Write(dst)
//log.Printf("wrote %v bytes with the fling", n)
if e != nil || n == 0 { // writing failed try again with another connection
// don't queue new connections if the write was dumb
if difRef(dst, stuff, qcclo, cclo) { // if dst reference is different it is not a dumbwrite, so we requeue the failed write
qShot(dst, fchan)
}
} else if dup { // write is successful, decide to close fully or partially
if rw.SetToIf(false, true) { // lasso is not done, only close write
log.Printf("write close the fling conn")
difRef(dst, stuff, qwclo, wclo)
// after write is closed start the deadline because
// the connection might close unexpectedly
// c.SetReadDeadline(time.Now().Add(2000 * time.Millisecond))
} else { // lasso is done, full close
log.Printf("full close the fling conn")
difRef(dst, stuff, qcclo, cclo)
}
} else {
log.Printf("full close the fling conn")
difRef(dst, stuff, qcclo, cclo)
}
*(frg.avlc)--
// log.Printf("scaled down avlc %v", *(frg.avlc))
log.Printf("shot flung len: %v to %v", len(dst), c.LocalAddr())
}
// compare reference of slices
func difRef(sl1 []byte, sl2 []byte, truer func(), falser func()) bool {
if &sl1 != &sl2 {
// log.Printf("truer")
truer()
return true
}
// log.Printf("falser")
falser()
return false
}
// close a connection, requeue a new one
func cClose(c *net.TCPConn, newc chan<- bool) {
c.Close()
if newc != nil {
newc <- true
}
}
// close read from tcp connection, requeue a new one
func rClose(c *net.TCPConn, newc chan<- bool) {
c.CloseRead()
if newc != nil {
newc <- true
}
}
// close read from tcp connection, requeue a new one
func wClose(c *net.TCPConn, newc chan<- bool) {
c.CloseWrite()
if newc != nil {
newc <- true
}
}
// closeread or hardclose depending bool
// true full close
// false partial close
func crClose(c *net.TCPConn, rw *abool.AtomicBool) bool {
if rw.SetToIf(false, true) { // No write was done so close read
c.CloseRead()
return false
}
// write was done so fully close
c.Close()
return true
}
// closewrite or hardclose depending bool
func cwClose(c *net.TCPConn, rw *abool.AtomicBool) bool {
if rw.SetToIf(false, true) {
c.CloseWrite()
return false
}
c.Close()
return true
}
// Read from connection checking if closed and queueing up new ones
// successful read: true, true
// empty read: true, false
// io.EOF: false, true
// other error: false, false
func cRead(c Conn, pl []byte, n *int, newc chan<- bool) (bool, bool) {
//log.Printf("doing a read")
var e error
*n, e = c.Read(pl)
// log.Printf("did a read %v %v", n, e)
switch {
default:
return true, true // we read and no errors were found
case *n == 0 && (e == nil || e == io.EOF):
rClose(c, newc)
return false, false // we didnt read but error is EOF so w/e
case *n != 0 && e != nil:
rClose(c, newc)
return true, false // we read something but there is an error so b/w
case e != nil && e != io.EOF:
rClose(c, newc)
return false, false // the error is not EOF so false in any case
}
}
// read from connection without queing up new conns on close
// generally want to close the connection on single reads
func cReadNoQ(c Conn, pl []byte, n *int, close bool) (bool, bool) {
//log.Printf("doing a read")
var e error
*n, e = c.Read(pl)
// log.Printf("did a read %v %v", *n, e)
switch {
default:
return true, true // we read and no errors were found
case *n == 0 && (e == nil || e == io.EOF):
go (*c).CloseRead()
return false, false // we didnt read but error is EOF so w/e
case *n != 0 && e != nil && close:
go (*c).CloseRead()
return true, false // we read something but there is an error so b/w
case *n != 0 && e != nil:
return true, false // same as prev but don't close conn
case e != nil && e != io.EOF && close:
go (*c).CloseRead()
return false, false // the error is not EOF so false in any case
case e != nil && e != io.EOF:
return false, false // same as prev but don't close conn
}
}
// check if connesion is closed with a timeout
func isConnClosed(c Conn) bool {
var b []byte
var err error
// log.Printf("doing a read")
c.SetReadDeadline(time.Now().Add(1 * time.Second))
_, err = c.Read(b)
if e, ok := err.(net.Error); ok && e.Timeout() {
return false // timeout
} else if err == nil {
return false
}
return true
}
// throw a bunch of connections for catching incoming data to the lassoServer
func lassoer(rLasso *string, cachan chan<- *Shot, fchan chan []byte, frg *Frags,
lassoes int, newc chan bool, tid time.Duration, dup *string, prctl *string) {
// a conn queue for lassoes
cq := make(chan Conn, lassoes)
// a dumb byte array for late writes
// stuff := make([]byte, 10)
go connQueue(rLasso, cq, newc, lassoes, time.Duration(frg.after), nil)
// log.Printf("lasso connections should be starting now...")
// a throw for each connection
if defDup(dup) {
for c := range cq {
rw := abool.New()
go lasso(prctl, cachan, c, newc, frg, tid, true, rw, nil)
// go fling(fchan, c, nil, frg, true, rw, stuff) // only the throw can spawn new lassoes
}
} else {
rw := abool.NewBool(true)
for c := range cq {
go lasso(prctl, cachan, c, newc, frg, tid, false, rw, nil)
}
}
}
// read shot fields queuing up a new connection on fail
func readShotFields(c Conn, reads [3][]byte, newc chan<- bool) bool {
var n int
for _, r := range reads {
if _, b2 := cRead(c, r, &n, newc); !b2 {
// log.Printf("shot read error for field %v (Q)", i)
return false
}
}
return true
}
// read shot fields without queuing up new conns
func readShotFieldsNoQ(c Conn, reads [3][]byte) bool {
var n int
for _, r := range reads {
if _, b2 := cReadNoQ(c, r, &n, true); !b2 {
// log.Printf("shot read error for field %v (NoQ)", i)
return false
}
}
return true
}
func readClientCmdFields(c Conn, reads [2][]byte, newc chan<- bool) bool {
var n int
for i, r := range reads {
if _, b2 := cRead(c, r, &n, newc); !b2 {
log.Printf("clientcmd read error for field %v", i)
return false
}
}
return true
}
// grasp the data from a sent connection waiting for reading data
func lasso(prctl *string, cachan chan<- *Shot, c Conn, newc chan<- bool, frg *Frags,
tid time.Duration, dup bool, rw *abool.AtomicBool, pubsub chan bool) {
var n int
shot := makeShot(*prctl, frg.payload)
// log.Printf("waiting to read shot fields (lasso) %v", c.RemoteAddr())
if !readShotFields(c, [3][]byte{shot.client, shot.ofs, shot.seq}, newc) {
// log.Printf("failed to read shot fields %v", c.RemoteAddr())
if !crClose(c, rw) {
pubsub <- true
}
return
}
// log.Printf("read shot fields (lasso)")
if !readShotPayload(c, shot.payload, frg, &n, newc, tid) {
if !crClose(c, rw) {
pubsub <- true
}
return
}
// log.Printf("read payload (lasso)")
shot.ln = uint32(n)
// log.Printf("channeling lassoed shot, ofs: %v", intBytes(shot.ofs))
cachan <- shot
// log.Printf("closing lasso connection newc: %v", newc)
if !crClose(c, rw) { // if partial close signal the fling to give up the conn
pubsub <- true
} // either CloseRead or fully Close in case the fling already used up the conn
}
func waitForMaps(ct int64, clientOfsMap map[int64]map[uint32]*Shot, wait map[int64]time.Duration) bool {
for clientOfsMap[ct] == nil {
if wait[ct] < 1500 {
// log.Printf("waiting for connection for client: %v", ct)
time.Sleep(wait[ct] * time.Millisecond)
wait[ct] = wait[ct] + 10
} else {
return false // skip the shot
}
}
return true
}
func recentStart(ct int64, after map[int64]int64, t int64) bool {
if after[ct] == 0 {
after[ct] = time.Now().UnixNano()
} else {
// if startup period has passed
if (time.Now().UnixNano()-after[ct])/int64(time.Millisecond) > t {
return false
}
}
return true
}
// prepare received shots to be forwarded
func dispatch(clients map[int64]bool, shotsChan <-chan *Shot, connMap map[int64]Conn,
clientOfsMap map[int64]map[uint32]*Shot, cOfsMap map[int64]uint32, fwmap map[int64][2]chan bool,
mtx *sync.Mutex, skip int, frg *Frags) {
failmap := map[int64]int{} // counter for consecutive failed forwarding attempts
bmap := map[int64]int{} // counter for number of buffered shots for each client
wait := map[int64]time.Duration{} // each client init waiting times
after := map[int64]int64{} // countdown after which buffering starts
// fwmtx := &sync.Mutex{}
for {
select {
// mtx.Lock()
case shot := <-shotsChan:
ct := timeBytes(shot.client)
ofs := intBytes(shot.ofs)
// log.Printf("dispatching shot with ofs: %v", ofs)
if !waitForMaps(ct, clientOfsMap, wait) {
continue // skip the shot
}
// only forward on time shots
if ofs >= cOfsMap[ct] {
clientOfsMap[ct][ofs] = shot
bmap[ct]++ // a shot was just added to the queue
// mtx.Lock()
// forward only during startup time or after buffer is filled
if recentStart(ct, after, frg.after) || bmap[ct] >= frg.bs {
cOfsMap, failmap[ct] = forward(ct, cOfsMap, failmap[ct], clientOfsMap,
connMap, bmap, fwmap[ct][0], skip, frg)
}