-
Notifications
You must be signed in to change notification settings - Fork 0
/
examples_test.go
189 lines (151 loc) · 4.96 KB
/
examples_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package pubsub_test
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/mdawar/pubsub"
)
func Example() {
// A broker with topic and message payload of type string.
broker := pubsub.NewBroker[string, string]()
// Topic to publish the message to.
topic := "example"
// Number of subscribers to the topic.
subCount := 100
var wg sync.WaitGroup
// Run consumer goroutines subscribed to the same topic.
for range subCount {
wg.Add(1)
go func() {
defer wg.Done()
// Subscribe to topic.
sub := broker.Subscribe(topic)
// Wait for message.
msg := <-sub
// Message fields.
_ = msg.Topic
_ = msg.Payload
}()
}
// Helper function to wait for all consumer goroutines to subscribe.
// This is just for the example and not needed in production code.
waitUntil(time.Second, func() bool {
return broker.Subscribers(topic) == subCount
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Publish a message concurrently to the topic.
// This call will wait for all subscribers to receive the message or
// until the context is canceled (e.g. on timeout).
err := broker.Publish(ctx, topic, "Message to deliver")
if err != nil {
// In this case 1 or more subscribers did not receive the message.
switch {
case errors.Is(err, context.Canceled):
fmt.Println("Publishing was canceled")
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("Publishing timed out")
}
}
wg.Wait()
fmt.Println(err)
// Output: <nil>
}
func ExampleBroker() {
// A broker with topic and message payload of type string.
// The payload can of any type.
pubsub.NewBroker[string, string]()
// A broker with integer topics.
pubsub.NewBroker[int, string]()
}
func ExampleBroker_Subscribe() {
broker := pubsub.NewBroker[string, string]()
// Create a subscription to a single topic.
sub1 := broker.Subscribe("events")
_ = sub1
// Create a subscription to multiple topics.
sub2 := broker.Subscribe("events", "actions", "errors")
_ = sub2
}
func ExampleBroker_SubscribeWithCapacity() {
broker := pubsub.NewBroker[string, string]()
// Create a subscription to a single topic with a specific channel capacity.
sub1 := broker.SubscribeWithCapacity(10, "events")
_ = sub1
// Create a subscription to multiple topics with a specific channel capacity.
sub2 := broker.SubscribeWithCapacity(10, "events", "actions", "errors")
_ = sub2
}
func ExampleBroker_Unsubscribe() {
broker := pubsub.NewBroker[string, string]()
sub := broker.Subscribe("events", "actions", "errors")
// Unsubscribe from a single topic.
broker.Unsubscribe(sub, "events")
// Unsubscribe from all topics.
// The channel will not be closed, it will only stop receiving messages.
broker.Unsubscribe(sub)
}
func ExampleBroker_Publish() {
broker := pubsub.NewBroker[string, string]()
// Publish a message to the topic concurrently.
//
// This call will wait for all the subscribers to receive the message
// or the context to be canceled.
//
// If there are no subscribers the message will be discarded.
err := broker.Publish(context.TODO(), "events", "Message payload to deliver")
// A nil error is expected if the context is not canceled.
fmt.Println(err == nil)
// Output: true
}
func ExampleBroker_Publish_timeout() {
broker := pubsub.NewBroker[string, string]()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Publish a message concurrently with a timeout of 1 second.
//
// Subscribers that are ready will receive the message, the others will be given
// up to the timeout value to receive the message.
//
// A slow consumer will not affect the other subscribers; the timeout applies
// individually to each subscriber.
err := broker.Publish(ctx, "events", "Message payload to deliver")
if err != nil {
// In this case 1 or more subscribers did not receive the message.
switch {
case errors.Is(err, context.Canceled):
fmt.Println("Publishing was canceled")
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("Publishing timed out")
}
}
}
func ExampleBroker_Subscribers() {
broker := pubsub.NewBroker[string, string]()
topic := "example"
for range 10 {
broker.Subscribe(topic)
}
// The number of subscribers to this topic.
subscribers := broker.Subscribers(topic)
fmt.Println(subscribers)
// Output: 10
}
func ExampleBroker_TryPublish() {
broker := pubsub.NewBroker[string, string]()
topic := "example"
// A subscription that will not receive the message.
// The channel is unbuffered and will not be ready when the message is published.
broker.Subscribe(topic)
// A buffered subscription that will receive the message.
bufferedSub := broker.SubscribeWithCapacity(1, topic)
// This method will send the message to the subscribers that are ready
// to receive it (channel buffer not full) and the others will be skipped.
broker.TryPublish(topic, "abc")
// Receive the message on the buffered subscription.
msg := <-bufferedSub
fmt.Println(msg.Payload)
// Output: abc
}