This repository has been archived by the owner on Dec 4, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 535
/
Copy pathevent_subscription.go
80 lines (65 loc) · 1.96 KB
/
event_subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package txpool
import (
"github.com/0xPolygon/polygon-edge/txpool/proto"
)
type subscriptionID int32
type eventSubscription struct {
// eventTypes is the list of subscribed event types
eventTypes []proto.EventType
// outputCh is the update channel for the subscriber
outputCh chan *proto.TxPoolEvent
// doneCh is the channel for handling stop signals
doneCh chan struct{}
// notifyCh is the channel for receiving event requests
notifyCh chan struct{}
// eventStore is used for temporary concurrent event storage,
// required in order to preserve the chronological order of events
eventStore *eventQueue
}
// eventSupported checks if the event is supported by the subscription
func (es *eventSubscription) eventSupported(eventType proto.EventType) bool {
for _, supportedType := range es.eventTypes {
if supportedType == eventType {
return true
}
}
return false
}
// close stops the event subscription
func (es *eventSubscription) close() {
close(es.doneCh)
}
// runLoop is the main loop that listens for notifications and handles the event / close signals
func (es *eventSubscription) runLoop() {
defer close(es.outputCh)
for {
select {
case <-es.doneCh: // Break if a close signal has been received
return
case <-es.notifyCh: // Listen for new events to appear
for {
// Grab the next event to be processed by order of sending
event := es.eventStore.pop()
if event == nil {
break
}
select {
case <-es.doneCh: // Break if a close signal has been received
return
case es.outputCh <- event: // Pass the event to the output
}
}
}
}
}
// pushEvent sends the event off for processing by the subscription. [NON-BLOCKING]
func (es *eventSubscription) pushEvent(event *proto.TxPoolEvent) {
if es.eventSupported(event.Type) {
// Append the event to the event store, so order can be preserved
es.eventStore.push(event)
select {
case es.notifyCh <- struct{}{}: // Notify the worker thread
default:
}
}
}