-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathclient.go
282 lines (248 loc) · 6.15 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package amqp
import (
"context"
"errors"
"time"
"github.com/devimteam/amqp/conn"
"github.com/devimteam/amqp/logger"
"github.com/streadway/amqp"
)
var (
// This error occurs when message was delivered, but it has too low or too high priority.
NotAllowedPriority = errors.New("not allowed priority")
// DeliveryChannelWasClosedError is an information error, that logs to info logger when delivery channel was closed.
DeliveryChannelWasClosedError = errors.New("delivery channel was closed")
// Durable or non-auto-delete queues with empty names will survive when all consumers have finished using it, but no one can connect to it back.
QueueDeclareWarning = errors.New("declaring durable or non-auto-delete queue with empty name")
)
// Event represents amqp.Delivery with attached context and data
type Event struct {
// Converted and ready to use pointer to entity of reply type.
Data interface{}
// Event's context.
// Contains context.Background by default and setups with DeliveryBefore option.
Context context.Context
amqp.Delivery
}
// Done is a shortcut for Ack(false) without returning error
func (e Event) Done() {
_ = e.Ack(false)
}
type Client struct {
exchanges []Exchange
queues []Queue
bindings []Binding
conn *conn.Connection
observer *observer
logger logger.Logger
done func()
ctx context.Context
}
func NewClient(connector conn.Connector, decls ...Declaration) (Client, error) {
cl := Client{}
cl.constructorBefore(decls...)
ctx, done := context.WithCancel(context.Background())
cl.done = done
cl.ctx = ctx
var err error
if cl.conn, err = connector(); err != nil {
return cl, err
}
cl.observer = newObserver(ctx, cl.conn, Max(1)) // We need only one channel to declare all queues and exchanges.
cl.declare()
go func() {
for <-cl.conn.NotifyClose(); ; <-cl.conn.NotifyClose() {
select {
case <-ctx.Done():
return
default:
if err := cl.conn.NotifyConnected(time.Minute); err != nil {
_ = cl.logger.Log(err)
continue
}
cl.declare()
}
}
}()
return cl, nil
}
func (c *Client) constructorBefore(decls ...Declaration) {
c.logger = logger.NoopLogger
withDeclarations(c, decls...)
}
func withDeclarations(cl *Client, opts ...Declaration) {
for i := range opts {
opts[i].declare(cl)
}
}
type Declaration interface {
declare(*Client)
}
func (c *Client) declare() {
ch := c.observer.channel()
for _, exchange := range c.exchanges {
if err := ch.declareExchange(exchange); err != nil {
_ = c.logger.Log(err)
}
}
for _, queue := range c.queues {
if warn := checkQueue(queue); warn != nil {
_ = c.logger.Log(warn)
}
if _, err := ch.declareQueue(queue); err != nil {
_ = c.logger.Log(err)
}
}
for _, binding := range c.bindings {
if err := ch.bind(binding); err != nil {
_ = c.logger.Log(err)
}
}
c.observer.release(ch)
}
func checkQueue(queue Queue) error {
if queue.Name == "" && (queue.Durable || !queue.AutoDelete) {
return QueueDeclareWarning
}
return nil
}
func (c *Client) Subscriber(opts ...SubscriberOption) *Subscriber {
var connection *conn.Connection
connection = c.conn
return newSubscriber(c.ctx, connection, opts...)
}
func (c *Client) Publisher(opts ...PublisherOption) *Publisher {
var connection *conn.Connection
connection = c.conn
return newPublisher(c.ctx, connection, opts...)
}
func (c *Client) Stop() {
c.done()
}
type Exchange struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqp.Table
}
func (e Exchange) declare(c *Client) {
if e.Name == "" {
panic("exchange name can't be empty")
}
c.exchanges = append(c.exchanges, e)
}
// TemporaryExchange is a common way to create temporary exchange with given name.
func TemporaryExchange(name string) Exchange {
return Exchange{
Name: name,
Kind: "fanout",
Durable: false,
AutoDelete: true,
Internal: false,
NoWait: false,
}
}
// LongExchange is a common way to declare exchange with given name.
func LongExchange(name string) Exchange {
return Exchange{
Name: name,
Kind: "fanout",
Durable: true,
AutoDelete: false,
}
}
type Exchanges []Exchange
func (e Exchanges) declare(c *Client) {
for i := range e {
if e[i].Name == "" {
panic("exchange name can't be empty")
}
c.exchanges = append(c.exchanges, e[i])
}
}
// PersistentExchanges allow you to declare a bunch of exchanges with given names.
func PersistentExchanges(names ...string) (e Exchanges) {
for i := range names {
e = append(e, LongExchange(names[i]))
}
return
}
type Queue struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args amqp.Table
}
// LongQueue is a common way to declare queue with given name.
func LongQueue(name string) Queue {
return Queue{
Name: name,
Durable: true,
AutoDelete: false,
}
}
func (q Queue) declare(c *Client) {
if q.Name == "" {
panic("do not declare queue with empty name")
}
c.queues = append(c.queues, q)
}
type Queues []Queue
func (q Queues) declare(c *Client) {
for i := range q {
if q[i].Name == "" {
panic("queue name can't be empty")
}
c.queues = append(c.queues, q[i])
}
}
// PersistentQueues allow you to declare a bunch of queues with given names.
func PersistentQueues(names ...string) (e Queues) {
for i := range names {
e = append(e, LongQueue(names[i]))
}
return
}
// Binding is used for bind exchange and queue.
type Binding struct {
Exchange string
Queue string
Key string
NoWait bool
Args amqp.Table
}
func (b Binding) declare(c *Client) {
if b.Queue == "" || b.Exchange == "" {
panic("empty exchange or queue name")
}
c.bindings = append(c.bindings, b)
}
type Consumer struct {
Consumer string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqp.Table
LimitCount int
LimitSize int
}
// Publish is used for AMQP Publish parameters.
type Publish struct {
Key string
Mandatory bool
Immediate bool
Priority uint8
}
// WithLogger set logger for client, which will report declaration problems and so on.
type WithLogger struct {
logger.Logger
}
func (b WithLogger) declare(c *Client) {
c.logger = b.Logger
}