-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
155 lines (123 loc) · 2.53 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// This won't work
// ERROR: race: limit on 8128 simultaneously alive goroutines is exceeded, dying
// exit status 66
// go run --race main.go
const (
maxPoolSize = 20
)
type OccurenceCounter struct {
v map[uint32]int
mux sync.RWMutex
}
func (c *OccurenceCounter) Inc(key uint32) {
c.mux.Lock()
c.v[key]++
c.mux.Unlock()
}
func (c *OccurenceCounter) Value(key uint32) int {
c.mux.RLock()
defer c.mux.RUnlock()
return c.v[key]
}
func (c *OccurenceCounter) Save() {
c.mux.Lock()
defer c.mux.Unlock()
// 1. In Go, Locks are not re-entrant...
total := c.agg()
// Simulates the save into PG
fmt.Println("Save into Postgres total:", total)
// cleaning up the mutex
c.clear()
}
func (c *OccurenceCounter) Total() int {
c.mux.RLock()
defer c.mux.RUnlock()
return c.agg()
}
func (c *OccurenceCounter) agg() int {
total := 0
for _, v := range c.v {
total = total + v
}
return total
}
func (c *OccurenceCounter) Clear() {
c.mux.Lock()
c.clear()
c.mux.Unlock()
}
func (c *OccurenceCounter) clear() {
c.v = make(map[uint32]int)
}
func CountOccurance(i uint32, m *OccurenceCounter) {
m.Inc(i)
}
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
ticker := time.NewTicker(2 * time.Second)
quitTicker := make(chan struct{})
flushDone := make(chan int)
oc := &OccurenceCounter{v: make(map[uint32]int)}
// Periodically write to Postgres.
go func(mx *OccurenceCounter) {
for {
select {
case <-ticker.C:
// main logic
mx.Save()
case <-quitTicker:
ticker.Stop()
// Attempt one final save.
mx.Save()
// Signal final flush is completed.
close(flushDone)
return
}
}
}(oc)
// Bound worker-pool with max goroutines.
sem := make(chan int, maxPoolSize)
go func() {
var myWG sync.WaitGroup
for {
select {
// Upon sigterm, start teardown.
case <-sigs:
// Wait for all worker items to finish.
myWG.Wait()
// Ask for tear down ticker.
close(quitTicker)
return
default:
// Otherwise, continue to enqueue.
myWG.Add(1)
sem <- 1
go func() {
defer myWG.Done()
enqueueTask(&myWG, oc)
<-sem
}()
}
}
}()
// Wait for final flush from ticker tear down.
<-flushDone
// Ensure zero result.
fmt.Println("Final total: ", oc.Total())
fmt.Println("Shutdown success...")
}
func enqueueTask(wg *sync.WaitGroup, oc *OccurenceCounter) {
theInt := uint32(rand.Intn(100))
CountOccurance(theInt, oc)
}