-
-
Notifications
You must be signed in to change notification settings - Fork 93
/
options.go
395 lines (341 loc) · 12 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package centrifuge
import "time"
// PublishOption is a type to represent various Publish options.
type PublishOption func(*PublishOptions)
// WithHistory tells Broker to save message to history stream with provided size and ttl.
func WithHistory(size int, ttl time.Duration, metaTTL ...time.Duration) PublishOption {
return func(opts *PublishOptions) {
opts.HistorySize = size
opts.HistoryTTL = ttl
if len(metaTTL) > 0 {
opts.HistoryMetaTTL = metaTTL[0]
}
}
}
// WithIdempotencyKey tells Broker the idempotency key for the publication.
// See PublishOptions.IdempotencyKey.
func WithIdempotencyKey(key string) PublishOption {
return func(opts *PublishOptions) {
opts.IdempotencyKey = key
}
}
// WithDelta tells Broker to use delta streaming.
func WithDelta(enabled bool) PublishOption {
return func(opts *PublishOptions) {
opts.UseDelta = enabled
}
}
// WithIdempotentResultTTL sets the time of expiration for results of idempotent publications.
// See PublishOptions.IdempotentResultTTL for more description and defaults.
func WithIdempotentResultTTL(ttl time.Duration) PublishOption {
return func(opts *PublishOptions) {
opts.IdempotentResultTTL = ttl
}
}
// WithClientInfo adds ClientInfo to Publication.
func WithClientInfo(info *ClientInfo) PublishOption {
return func(opts *PublishOptions) {
opts.ClientInfo = info
}
}
// WithTags allows setting Publication.Tags.
func WithTags(meta map[string]string) PublishOption {
return func(opts *PublishOptions) {
opts.Tags = meta
}
}
// SubscribeOptions define per-subscription options.
type SubscribeOptions struct {
// ExpireAt defines time in future when subscription should expire,
// zero value means no expiration.
ExpireAt int64
// ChannelInfo defines custom channel information, zero value means no channel information.
ChannelInfo []byte
// EmitPresence turns on participating in channel presence - i.e. client
// subscription will emit presence updates to PresenceManager and will be visible
// in a channel presence result.
EmitPresence bool
// EmitJoinLeave turns on emitting Join and Leave events from the subscribing client.
// See also PushJoinLeave if you want current client to receive join/leave messages.
EmitJoinLeave bool
// PushJoinLeave turns on receiving channel Join and Leave events by the client.
// Subscriptions which emit join/leave events should have EmitJoinLeave on.
PushJoinLeave bool
// When position is on client will additionally sync its position inside a stream
// to prevent publication loss. The loss can happen due to at most once guarantees
// of PUB/SUB model. Make sure you are enabling EnablePositioning in channels that
// maintain Publication history stream. When EnablePositioning is on Centrifuge will
// include StreamPosition information to subscribe response - for a client to be
// able to manually track its position inside a stream.
EnablePositioning bool
// EnableRecovery turns on automatic recovery for a channel. In this case
// client will try to recover missed messages upon resubscribe to a channel
// after reconnect to a server. This option also enables client position
// tracking inside a stream (i.e. enabling EnableRecovery will automatically
// enable EnablePositioning option) to prevent occasional publication loss.
// Make sure you are using EnableRecovery in channels that maintain Publication
// history stream.
EnableRecovery bool
// RecoveryMode is by default RecoveryModeStream, but can be also RecoveryModeCache.
RecoveryMode RecoveryMode
// Data to send to a client with Subscribe Push.
Data []byte
// RecoverSince will try to subscribe a client and recover from a certain StreamPosition.
RecoverSince *StreamPosition
// HistoryMetaTTL allows to override default (set in Config.HistoryMetaTTL) history
// meta information expiration time.
HistoryMetaTTL time.Duration
// AllowedDeltaTypes is a whitelist of DeltaType subscribers can negotiate. At this point Centrifuge
// only supports DeltaTypeFossil. If zero value – clients won't be able to negotiate delta encoding
// within a channel and will receive full data in publications.
// Delta encoding is an EXPERIMENTAL feature and may be changed.
AllowedDeltaTypes []DeltaType
// clientID to subscribe.
clientID string
// sessionID to subscribe.
sessionID string
// Source is a way to mark the source of Subscription - i.e. where it comes from. May be useful
// for inspection of a connection during its lifetime.
Source uint8
}
// SubscribeOption is a type to represent various Subscribe options.
type SubscribeOption func(*SubscribeOptions)
// WithExpireAt allows setting ExpireAt field.
func WithExpireAt(expireAt int64) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.ExpireAt = expireAt
}
}
// WithChannelInfo ...
func WithChannelInfo(chanInfo []byte) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.ChannelInfo = chanInfo
}
}
// WithEmitPresence ...
func WithEmitPresence(enabled bool) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.EmitPresence = enabled
}
}
// WithEmitJoinLeave ...
func WithEmitJoinLeave(enabled bool) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.EmitJoinLeave = enabled
}
}
// WithPushJoinLeave ...
func WithPushJoinLeave(enabled bool) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.PushJoinLeave = enabled
}
}
// WithPositioning ...
func WithPositioning(enabled bool) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.EnablePositioning = enabled
}
}
// WithRecovery ...
func WithRecovery(enabled bool) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.EnableRecovery = enabled
}
}
type RecoveryMode uint8
const (
RecoveryModeStream RecoveryMode = 0
RecoveryModeCache RecoveryMode = 1
)
// WithRecoveryMode ...
func WithRecoveryMode(mode RecoveryMode) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.RecoveryMode = mode
}
}
// WithSubscribeClient allows setting client ID that should be subscribed.
// This option not used when Client.Subscribe called.
func WithSubscribeClient(clientID string) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.clientID = clientID
}
}
// WithSubscribeSession allows setting session ID that should be subscribed.
// This option not used when Client.Subscribe called.
func WithSubscribeSession(sessionID string) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.sessionID = sessionID
}
}
// WithSubscribeData allows setting custom data to send with subscribe push.
func WithSubscribeData(data []byte) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.Data = data
}
}
// WithRecoverSince allows setting SubscribeOptions.RecoverFrom.
func WithRecoverSince(since *StreamPosition) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.RecoverSince = since
}
}
// WithSubscribeSource allows setting SubscribeOptions.Source.
func WithSubscribeSource(source uint8) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.Source = source
}
}
// WithSubscribeHistoryMetaTTL allows setting SubscribeOptions.HistoryMetaTTL.
func WithSubscribeHistoryMetaTTL(metaTTL time.Duration) SubscribeOption {
return func(opts *SubscribeOptions) {
opts.HistoryMetaTTL = metaTTL
}
}
// RefreshOptions ...
type RefreshOptions struct {
// Expired can close connection with expired reason.
Expired bool
// ExpireAt defines time in future when subscription should expire,
// zero value means no expiration.
ExpireAt int64
// Info defines custom channel information, zero value means no channel information.
Info []byte
// clientID to refresh.
clientID string
// sessionID to refresh.
sessionID string
}
// RefreshOption is a type to represent various Refresh options.
type RefreshOption func(options *RefreshOptions)
// WithRefreshClient to limit refresh only for specified client ID.
func WithRefreshClient(clientID string) RefreshOption {
return func(opts *RefreshOptions) {
opts.clientID = clientID
}
}
// WithRefreshSession to limit refresh only for specified session ID.
func WithRefreshSession(sessionID string) RefreshOption {
return func(opts *RefreshOptions) {
opts.sessionID = sessionID
}
}
// WithRefreshExpired to set expired flag - connection will be closed with DisconnectExpired.
func WithRefreshExpired(expired bool) RefreshOption {
return func(opts *RefreshOptions) {
opts.Expired = expired
}
}
// WithRefreshExpireAt to set unix seconds in the future when connection should expire.
// Zero value means no expiration.
func WithRefreshExpireAt(expireAt int64) RefreshOption {
return func(opts *RefreshOptions) {
opts.ExpireAt = expireAt
}
}
// WithRefreshInfo to override connection info.
func WithRefreshInfo(info []byte) RefreshOption {
return func(opts *RefreshOptions) {
opts.Info = info
}
}
// UnsubscribeOptions ...
type UnsubscribeOptions struct {
// clientID to unsubscribe.
clientID string
// sessionID to unsubscribe.
sessionID string
// custom unsubscribe object.
unsubscribe *Unsubscribe
}
// UnsubscribeOption is a type to represent various Unsubscribe options.
type UnsubscribeOption func(options *UnsubscribeOptions)
// WithUnsubscribeClient allows setting client ID that should be unsubscribed.
// This option not used when Client.Unsubscribe called.
func WithUnsubscribeClient(clientID string) UnsubscribeOption {
return func(opts *UnsubscribeOptions) {
opts.clientID = clientID
}
}
// WithUnsubscribeSession allows setting session ID that should be unsubscribed.
// This option not used when Client.Unsubscribe called.
func WithUnsubscribeSession(sessionID string) UnsubscribeOption {
return func(opts *UnsubscribeOptions) {
opts.sessionID = sessionID
}
}
// WithCustomUnsubscribe allows setting custom Unsubscribe.
func WithCustomUnsubscribe(unsubscribe Unsubscribe) UnsubscribeOption {
return func(opts *UnsubscribeOptions) {
opts.unsubscribe = &unsubscribe
}
}
// DisconnectOptions define some fields to alter behaviour of Disconnect operation.
type DisconnectOptions struct {
// Disconnect represents custom disconnect to use.
// By default, DisconnectForceNoReconnect will be used.
Disconnect *Disconnect
// ClientWhitelist contains client IDs to keep.
ClientWhitelist []string
// clientID to disconnect.
clientID string
// sessionID to disconnect.
sessionID string
}
// DisconnectOption is a type to represent various Disconnect options.
type DisconnectOption func(options *DisconnectOptions)
// WithCustomDisconnect allows setting custom Disconnect.
func WithCustomDisconnect(disconnect Disconnect) DisconnectOption {
return func(opts *DisconnectOptions) {
opts.Disconnect = &disconnect
}
}
// WithDisconnectClient allows setting Client.
func WithDisconnectClient(clientID string) DisconnectOption {
return func(opts *DisconnectOptions) {
opts.clientID = clientID
}
}
// WithDisconnectSession allows setting session ID to disconnect.
func WithDisconnectSession(sessionID string) DisconnectOption {
return func(opts *DisconnectOptions) {
opts.sessionID = sessionID
}
}
// WithDisconnectClientWhitelist allows setting ClientWhitelist.
func WithDisconnectClientWhitelist(whitelist []string) DisconnectOption {
return func(opts *DisconnectOptions) {
opts.ClientWhitelist = whitelist
}
}
// HistoryOption is a type to represent various History options.
type HistoryOption func(options *HistoryOptions)
// NoLimit defines that limit should not be applied.
const NoLimit = -1
// WithLimit allows setting HistoryOptions.Limit.
func WithLimit(limit int) HistoryOption {
return func(opts *HistoryOptions) {
opts.Filter.Limit = limit
}
}
// WithSince allows setting HistoryOptions.Since option.
func WithSince(sp *StreamPosition) HistoryOption {
return func(opts *HistoryOptions) {
opts.Filter.Since = sp
}
}
// WithReverse allows setting HistoryOptions.Reverse option.
func WithReverse(reverse bool) HistoryOption {
return func(opts *HistoryOptions) {
opts.Filter.Reverse = reverse
}
}
func WithHistoryFilter(filter HistoryFilter) HistoryOption {
return func(opts *HistoryOptions) {
opts.Filter = filter
}
}
func WithHistoryMetaTTL(metaTTL time.Duration) HistoryOption {
return func(opts *HistoryOptions) {
opts.MetaTTL = metaTTL
}
}