-
Notifications
You must be signed in to change notification settings - Fork 0
/
broker.go
203 lines (176 loc) · 5.2 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Package pubsub provides a generic and concurrency-safe, topic-based publish/subscribe library for in-process communication.
package pubsub
import (
"context"
"sync"
)
// Message represents a message delivered by the broker to a subscriber.
type Message[T any, P any] struct {
// Topic is the topic on which the message is published.
Topic T
// Payload holds the published value.
Payload P
}
// Broker represents a message broker.
//
// The Broker is the core component of the pub/sub library.
// It manages the registration of subscribers and handles the publishing
// of messages to specific topics.
//
// The Broker supports concurrent operations.
type Broker[T comparable, P any] struct {
// Mutex to protect the subs map.
mu sync.RWMutex
// subs holds the topics and their subscriptions as a slice.
subs map[T][]chan Message[T, P]
}
// NewBroker creates a new message [Broker] instance.
func NewBroker[T comparable, P any]() *Broker[T, P] {
return &Broker[T, P]{
subs: make(map[T][]chan Message[T, P]),
}
}
// Topics returns a slice of all the topics registered on the [Broker].
//
// A nil slice is returned if there are no topics.
//
// Note: The order of the topics is not guaranteed.
func (b *Broker[T, P]) Topics() []T {
b.mu.RLock()
defer b.mu.RUnlock()
var topics []T
// The iteration order over maps is not guaranteed.
for topic := range b.subs {
topics = append(topics, topic)
}
return topics
}
// NumTopics returns the total number of topics registered on the [Broker].
func (b *Broker[T, P]) NumTopics() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subs)
}
// Subscribers returns the number of subscriptions on the specified topic.
func (b *Broker[T, P]) Subscribers(topic T) int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.subs[topic])
}
// Subscribe creates a subscription for the specified topics.
//
// The created subscription channel is unbuffered (capacity = 0).
func (b *Broker[T, P]) Subscribe(topics ...T) <-chan Message[T, P] {
return b.SubscribeWithCapacity(0, topics...)
}
// Subscribe creates a subscription for the specified topics with the specified capacity.
//
// The capacity specifies the subscription channel's buffer capacity.
func (b *Broker[T, P]) SubscribeWithCapacity(capacity int, topics ...T) <-chan Message[T, P] {
b.mu.Lock()
defer b.mu.Unlock()
sub := make(chan Message[T, P], capacity)
for _, topic := range topics {
b.subs[topic] = append(b.subs[topic], sub)
}
return sub
}
// Unsubscribe removes a subscription for the specified topics.
//
// All topic subscriptions are removed if none are specified.
//
// The channel will not be closed, it will only stop receiving messages.
//
// Note: Specifying the topics to unsubscribe from can be more efficient.
func (b *Broker[T, P]) Unsubscribe(sub <-chan Message[T, P], topics ...T) {
b.mu.Lock()
defer b.mu.Unlock()
if len(topics) > 0 {
// Unsubscribe from the specified topics.
for _, topic := range topics {
b.removeSubscription(sub, topic)
}
return
}
// Unsubscribe from all topics.
for topic := range b.subs {
b.removeSubscription(sub, topic)
}
}
// removeSubscription removes a subscription channel from a topic.
//
// The topic will be removed if there are no other subscriptions.
func (b *Broker[T, P]) removeSubscription(sub <-chan Message[T, P], topic T) {
subscribers := b.subs[topic]
for i, s := range subscribers {
if s == sub {
// Remove the topic if this is the only subscription.
if len(subscribers) == 1 {
delete(b.subs, topic)
} else {
// Remove the subscription channel form the slice.
b.subs[topic] = append(subscribers[:i], subscribers[i+1:]...)
}
}
}
}
// Publish publishes a [Message] to the topic with the specified payload.
//
// The message is sent concurrently to the subscribers, ensuring that a slow
// consumer won't affect the other subscribers.
//
// This method will block and wait for all the subscriptions to receive
// the message or until the context is canceled.
//
// The value of [context.Context.Err] will be returned.
//
// A nil return value indicates that all the subscribers received the message.
//
// If there are no subscribers to the topic, the message will be discarded.
func (b *Broker[T, P]) Publish(ctx context.Context, topic T, payload P) error {
b.mu.RLock()
defer b.mu.RUnlock()
subs := b.subs[topic]
msg := Message[T, P]{Topic: topic, Payload: payload}
switch len(subs) {
case 0:
// Do nothing.
case 1:
select {
case <-ctx.Done():
case subs[0] <- msg:
}
default:
var wg sync.WaitGroup
wg.Add(len(subs))
for _, sub := range subs {
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case sub <- msg:
}
}()
}
wg.Wait()
}
return ctx.Err()
}
// TryPublish publishes a message to the topic with the specified payload if the subscription's
// channel buffer is not full.
//
// The message is sent sequentially to the subscribers that are ready to receive it and the others
// are skipped.
//
// Note: Use the [Broker.Publish] method for guaranteed delivery.
func (b *Broker[T, P]) TryPublish(topic T, payload P) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, sub := range b.subs[topic] {
select {
case sub <- Message[T, P]{Topic: topic, Payload: payload}:
default:
}
}
}