This repository has been archived by the owner on Dec 4, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 535
/
Copy pathevent_manager.go
107 lines (88 loc) · 2.74 KB
/
event_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package txpool
import (
"fmt"
"sync"
"sync/atomic"
"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/google/uuid"
"github.com/hashicorp/go-hclog"
)
type eventManager struct {
subscriptions map[subscriptionID]*eventSubscription
subscriptionsLock sync.RWMutex
numSubscriptions int64
logger hclog.Logger
}
func newEventManager(logger hclog.Logger) *eventManager {
return &eventManager{
logger: logger.Named("event-manager"),
subscriptions: make(map[subscriptionID]*eventSubscription),
numSubscriptions: 0,
}
}
type subscribeResult struct {
subscriptionID subscriptionID
subscriptionChannel chan *proto.TxPoolEvent
}
// subscribe registers a new listener for TxPool events
func (em *eventManager) subscribe(eventTypes []proto.EventType) *subscribeResult {
em.subscriptionsLock.Lock()
defer em.subscriptionsLock.Unlock()
id := uuid.New().ID()
subscription := &eventSubscription{
eventTypes: eventTypes,
outputCh: make(chan *proto.TxPoolEvent),
doneCh: make(chan struct{}),
notifyCh: make(chan struct{}, 10),
eventStore: &eventQueue{
events: make([]*proto.TxPoolEvent, 0),
},
}
em.subscriptions[subscriptionID(id)] = subscription
go subscription.runLoop()
em.logger.Info(fmt.Sprintf("Added new subscription %d", id))
atomic.AddInt64(&em.numSubscriptions, 1)
return &subscribeResult{
subscriptionID: subscriptionID(id),
subscriptionChannel: subscription.outputCh,
}
}
// cancelSubscription stops a subscription for TxPool events
func (em *eventManager) cancelSubscription(id subscriptionID) {
em.subscriptionsLock.Lock()
defer em.subscriptionsLock.Unlock()
if subscription, ok := em.subscriptions[id]; ok {
subscription.close()
em.logger.Info(fmt.Sprintf("Canceled subscription %d", id))
delete(em.subscriptions, id)
atomic.AddInt64(&em.numSubscriptions, -1)
}
}
// Close stops the event manager, effectively cancelling all subscriptions
func (em *eventManager) Close() {
em.subscriptionsLock.Lock()
defer em.subscriptionsLock.Unlock()
for _, subscription := range em.subscriptions {
subscription.close()
}
atomic.StoreInt64(&em.numSubscriptions, 0)
}
// signalEvent is a helper method for alerting listeners of a new TxPool event
func (em *eventManager) signalEvent(eventType proto.EventType, txHashes ...types.Hash) {
if atomic.LoadInt64(&em.numSubscriptions) < 1 {
// No reason to lock the subscriptions map
// if no subscriptions exist
return
}
em.subscriptionsLock.RLock()
defer em.subscriptionsLock.RUnlock()
for _, txHash := range txHashes {
for _, subscription := range em.subscriptions {
subscription.pushEvent(&proto.TxPoolEvent{
Type: eventType,
TxHash: txHash.String(),
})
}
}
}