forked from recws-org/recws
-
Notifications
You must be signed in to change notification settings - Fork 0
/
recws.go
235 lines (195 loc) · 5.37 KB
/
recws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Package recws provides websocket client based on gorilla/websocket
// that will automatically reconnect if the connection is dropped.
package recws
import (
"errors"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/jpillora/backoff"
)
// ErrNotConnected is returned when the application read/writes
// a message and the connection is closed
var ErrNotConnected = errors.New("websocket: not connected")
// The RecConn type represents a Reconnecting WebSocket connection.
type RecConn struct {
// RecIntvlMin specifies the initial reconnecting interval,
// default to 2 seconds
RecIntvlMin time.Duration
// RecIntvlMax specifies the maximum reconnecting interval,
// default to 30 seconds
RecIntvlMax time.Duration
// RecIntvlFactor specifies the rate of increase of the reconnection
// interval, default to 1.5
RecIntvlFactor float64
// HandshakeTimeout specifies the duration for the handshake to complete,
// default to 2 seconds
HandshakeTimeout time.Duration
// NonVerbose suppress connecting/reconnecting messages.
NonVerbose bool
mu sync.Mutex
url string
reqHeader http.Header
httpResp *http.Response
dialErr error
isConnected bool
dialer *websocket.Dialer
*websocket.Conn
}
// CloseAndRecconect will try to reconnect.
func (rc *RecConn) closeAndRecconect() {
rc.Close()
go func() {
rc.connect()
}()
}
// Close closes the underlying network connection without
// sending or waiting for a close frame.
func (rc *RecConn) Close() {
rc.mu.Lock()
if rc.Conn != nil {
rc.Conn.Close()
}
rc.isConnected = false
rc.mu.Unlock()
}
// ReadMessage is a helper method for getting a reader
// using NextReader and reading from that reader to a buffer.
//
// If the connection is closed ErrNotConnected is returned
func (rc *RecConn) ReadMessage() (messageType int, message []byte, err error) {
err = ErrNotConnected
if rc.IsConnected() {
messageType, message, err = rc.Conn.ReadMessage()
if err != nil {
rc.closeAndRecconect()
}
}
return
}
// WriteMessage is a helper method for getting a writer using NextWriter,
// writing the message and closing the writer.
//
// If the connection is closed ErrNotConnected is returned
func (rc *RecConn) WriteMessage(messageType int, data []byte) error {
err := ErrNotConnected
if rc.IsConnected() {
err = rc.Conn.WriteMessage(messageType, data)
if err != nil {
rc.closeAndRecconect()
}
}
return err
}
// WriteJSON writes the JSON encoding of v to the connection.
//
// See the documentation for encoding/json Marshal for details about the
// conversion of Go values to JSON.
//
// If the connection is closed ErrNotConnected is returned
func (rc *RecConn) WriteJSON(v interface{}) error {
err := ErrNotConnected
if rc.IsConnected() {
err = rc.Conn.WriteJSON(v)
if err != nil {
rc.closeAndRecconect()
}
}
return err
}
// Dial creates a new client connection.
// The URL url specifies the host and request URI. Use requestHeader to specify
// the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies
// (Cookie). Use GetHTTPResponse() method for the response.Header to get
// the selected subprotocol (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
func (rc *RecConn) Dial(urlStr string, reqHeader http.Header) {
if urlStr == "" {
log.Fatal("Dial: Url cannot be empty")
}
u, err := url.Parse(urlStr)
if err != nil {
log.Fatal("Url:", err)
}
if u.Scheme != "ws" && u.Scheme != "wss" {
log.Fatal("Url: websocket URIs must start with ws or wss scheme")
}
if u.User != nil {
log.Fatal("Url: user name and password are not allowed in websocket URIs")
}
rc.url = urlStr
if rc.RecIntvlMin == 0 {
rc.RecIntvlMin = 2 * time.Second
}
if rc.RecIntvlMax == 0 {
rc.RecIntvlMax = 30 * time.Second
}
if rc.RecIntvlFactor == 0 {
rc.RecIntvlFactor = 1.5
}
if rc.HandshakeTimeout == 0 {
rc.HandshakeTimeout = 2 * time.Second
}
rc.dialer = websocket.DefaultDialer
rc.dialer.HandshakeTimeout = rc.HandshakeTimeout
go func() {
rc.connect()
}()
// wait on first attempt
time.Sleep(rc.HandshakeTimeout)
}
func (rc *RecConn) connect() {
b := &backoff.Backoff{
Min: rc.RecIntvlMin,
Max: rc.RecIntvlMax,
Factor: rc.RecIntvlFactor,
Jitter: true,
}
rand.Seed(time.Now().UTC().UnixNano())
for {
nextItvl := b.Duration()
wsConn, httpResp, err := rc.dialer.Dial(rc.url, rc.reqHeader)
rc.mu.Lock()
rc.Conn = wsConn
rc.dialErr = err
rc.isConnected = err == nil
rc.httpResp = httpResp
rc.mu.Unlock()
if err == nil {
if !rc.NonVerbose {
log.Printf("Dial: connection was successfully established with %s\n", rc.url)
}
break
} else {
if !rc.NonVerbose {
log.Println(err)
log.Println("Dial: will try again in", nextItvl, "seconds.")
}
}
time.Sleep(nextItvl)
}
}
// GetHTTPResponse returns the http response from the handshake.
// Useful when WebSocket handshake fails,
// so that callers can handle redirects, authentication, etc.
func (rc *RecConn) GetHTTPResponse() *http.Response {
rc.mu.Lock()
defer rc.mu.Unlock()
return rc.httpResp
}
// GetDialError returns the last dialer error.
// nil on successful connection.
func (rc *RecConn) GetDialError() error {
rc.mu.Lock()
defer rc.mu.Unlock()
return rc.dialErr
}
// IsConnected returns the WebSocket connection state
func (rc *RecConn) IsConnected() bool {
rc.mu.Lock()
defer rc.mu.Unlock()
return rc.isConnected
}