-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinterface_connection.go
132 lines (114 loc) · 3.34 KB
/
interface_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package stableinterfaces
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
)
var (
ErrConnectionClosed = errors.New("connection closed")
)
const (
managerSide interfaceConnectionSide = iota
interfaceSide
)
type (
interfaceConnectionSide int
// InterfaceConnection is a persistent duplex connection to an interface.
InterfaceConnection struct {
ID string
side interfaceConnectionSide
// OnClose is called when the connection is closed from the interface-side. This is non-blocking.
onClose []func()
onCloseMu *sync.Mutex
// Optimistic shuttingDown
closed *atomic.Bool
// OnRecv is called when the interface send a message to the connection.
// This function is executed non-blocking (launched in a goroutine)
OnRecv func(payload any)
sendChan, recvChan, closedChan chan any
}
connectionPair struct {
ID string
InterfaceSide, ManagerSide InterfaceConnection
// Need channels to prevent panic on sending to closed channel, but not blocking
// because connection listener is already in goroutine
closedChan chan any
}
)
func launchConnectionPairListener(cp *connectionPair) {
for {
select {
case received := <-cp.ManagerSide.recvChan:
if cp.ManagerSide.OnRecv != nil {
go cp.ManagerSide.OnRecv(received)
}
case received := <-cp.InterfaceSide.recvChan:
if cp.InterfaceSide.OnRecv != nil {
go cp.InterfaceSide.OnRecv(received)
}
case <-cp.closedChan:
// Mark both sides closed
go cp.InterfaceSide.close()
go cp.ManagerSide.close()
return
}
}
}
func (cp *connectionPair) Close(ctx context.Context) error {
// They're the same `closed`
if !cp.InterfaceSide.closed.CompareAndSwap(false, true) {
return ErrConnectionClosed
}
select {
case cp.closedChan <- nil:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Close closes the connection
func (ic *InterfaceConnection) Close() error {
if !ic.closed.CompareAndSwap(false, true) {
return ErrConnectionClosed
}
// Close the loop
ic.closedChan <- nil
return nil
}
func (ic *InterfaceConnection) close() {
ic.closed.Store(true) // in case we haven't already done this (e.g. invoked from outside)
ic.onCloseMu.Lock()
defer ic.onCloseMu.Unlock()
for _, onClose := range ic.onClose {
onClose()
}
}
// Send sends a message to a Stable Interface instance for processing via it's OnRecv handler (if it exists).
// The context is only for queueing sends to the channel, not actual OnRecv processing, as sending is blocking.
// So best to keep this relatively short and launch goroutines in OnRecv.
// In the event that the channel is closes after we've checked (and it was found open), the ctx will time out with
// the error fmt.Errorf("%w: %w", context.DeadlineExceeded, ErrConnectionClosed).
// if there are queued sends when Close() is called, some may make it through due to Go's select.
func (ic *InterfaceConnection) Send(ctx context.Context, payload any) error {
if ic.closed.Load() {
return ErrConnectionClosed
}
select {
case ic.sendChan <- payload:
break
case <-ctx.Done():
// check if we closed between check and send
if ic.closed.Load() {
return fmt.Errorf("%w: %w", ctx.Err(), ErrConnectionClosed)
}
return ctx.Err()
}
return nil
}
func (ic *InterfaceConnection) AddOnCloseListener(f func()) {
ic.onCloseMu.Lock()
defer ic.onCloseMu.Unlock()
ic.onClose = append(ic.onClose, f)
}