-
Notifications
You must be signed in to change notification settings - Fork 138
/
reconnect_test.go
123 lines (103 loc) · 3.17 KB
/
reconnect_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package amqp091_test
import (
"context"
"fmt"
"os"
amqp "github.com/rabbitmq/amqp091-go"
)
// Every connection should declare the topology they expect
func setup(url, queue string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
if _, err := ch.QueueDeclare(queue, false, true, false, false, nil); err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func consume(url, queue string) (*amqp.Connection, <-chan amqp.Delivery, error) {
conn, ch, err := setup(url, queue)
if err != nil {
return nil, nil, err
}
// Indicate we only want 1 message to acknowledge at a time.
if err := ch.Qos(1, 0, false); err != nil {
return nil, nil, err
}
// Exclusive consumer
deliveries, err := ch.Consume(queue, "", false, true, false, false, nil)
return conn, deliveries, err
}
func ExampleConnection_reconnect() {
if url := os.Getenv("AMQP_URL"); url != "" {
queue := "example.reconnect"
// The connection/channel for publishing to interleave the ingress messages
// between reconnects, shares the same topology as the consumer. If we rather
// sent all messages up front, the first consumer would receive every message.
// We would rather show how the messages are not lost between reconnects.
con, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
defer con.Close()
// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
fmt.Println("err purge:", err)
return
}
// Reconnect simulation, should be for { ... } in production
for i := 1; i <= 3; i++ {
fmt.Println("connect")
conn, deliveries, err := consume(url, queue)
if err != nil {
fmt.Println("err consume:", err)
return
}
// Simulate a producer on a different connection showing that consumers
// continue where they were left off after each reconnect.
if err := pub.PublishWithContext(context.TODO(), "", queue, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("err publish:", err)
return
}
// Simulates a consumer that when the range finishes, will setup a new
// session and begin ranging over the deliveries again.
for msg := range deliveries {
fmt.Println(string(msg.Body))
if e := msg.Ack(false); e != nil {
fmt.Println("ack error: ", e)
}
// Simulate an error like a server restart, loss of route or operator
// intervention that results in the connection terminating
go conn.Close()
}
}
} else {
// pass with expected output when not running in an integration
// environment.
fmt.Println("connect")
fmt.Println("1")
fmt.Println("connect")
fmt.Println("2")
fmt.Println("connect")
fmt.Println("3")
}
// Output:
// connect
// 1
// connect
// 2
// connect
// 3
}