-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcast.go
62 lines (53 loc) · 1.44 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package broadcast
import "sync"
//Broadcaster is the root struct, an instance of this is used to register subscribers
type Broadcaster struct {
cond *sync.Cond
subscribers map[interface{}]func(interface{})
message interface{}
running bool
}
//NewBroadcaster gives the broadcaster object to be used further in message
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
cond: sync.NewCond(&sync.RWMutex{}),
subscribers: map[interface{}]func(interface{}){},
}
}
//Subscribe takes in the id of the subscriber, which can be anything, and a function to run as the message callback
func (b *Broadcaster) Subscribe(id interface{}, f func(input interface{})) {
b.subscribers[id] = f
}
//Unsubscribe removes a subscriber from the subscriber map
func (b *Broadcaster) Unsubscribe(id interface{}) {
b.cond.L.Lock()
delete(b.subscribers, id)
b.cond.L.Unlock()
}
//Publish creates a new message and sends it to all subscribed functions
func (b *Broadcaster) Publish(message interface{}) {
go func() {
b.cond.L.Lock()
b.message = message
b.cond.Broadcast()
b.cond.L.Unlock()
}()
}
// Start the main broadcasting event
func (b *Broadcaster) Start() {
b.running = true
for b.running {
b.cond.L.Lock()
b.cond.Wait()
go func() {
for _, f := range b.subscribers {
go f(b.message) // publishes the message
}
}()
b.cond.L.Unlock()
}
}
// Stop broadcasting event
func (b *Broadcaster) Stop() {
b.running = false
}