-
Notifications
You must be signed in to change notification settings - Fork 0
/
deltas.go
104 lines (96 loc) · 3.32 KB
/
deltas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"time"
"github.com/patrickmn/go-cache"
"github.com/thecsw/katya/log"
"github.com/thecsw/katya/storage"
)
const (
// globalDeltaCacheKey is a small map key used in delta cache for globals
globalDeltaCacheKey = "g"
// deltaUpdateInterval is how frequently should we update deltas
deltaUpdateInterval = time.Minute
)
var (
// globalNumWordsDelta caches yet-to-be-updated deltas in global word count
globalNumWordsDelta = cache.New(cache.NoExpiration, cache.NoExpiration)
// globalNumSentencesDelta caches yet-to-be-updated deltas in global sentences count
globalNumSentencesDelta = cache.New(cache.NoExpiration, cache.NoExpiration)
// sourcesNumWordsDelta caches yet-to-be-updated deltas in sources' word count
sourcesNumWordsDelta = cache.New(cache.NoExpiration, cache.NoExpiration)
// sourcesNumSentencesDelta caches yet-to-be-updated deltas in sources' sentences count
sourcesNumSentencesDelta = cache.New(cache.NoExpiration, cache.NoExpiration)
)
// updateGlobalWordSentencesDeltas updates global deltas of num_words and num_sentences
func updateGlobalWordSentencesDeltas() {
// whether we should print an update message at the end or not
actuallyUpdated := false
//l("Starting updating the global words/sentences count")
// Update the word count
wordDelta, _ := globalNumWordsDelta.Get(globalDeltaCacheKey)
if wordDelta.(uint) != 0 {
if err := storage.UpdateGlobalWordNum(wordDelta.(uint)); err != nil {
log.Error("failed updating global word count", err, nil)
return
}
actuallyUpdated = true
}
// Update the sentences count
sentDelta, _ := globalNumSentencesDelta.Get(globalDeltaCacheKey)
if sentDelta.(uint) != 0 {
if err := storage.UpdateGlobalSentNum(sentDelta.(uint)); err != nil {
log.Error("failed updating global sentences count", err, nil)
return
}
actuallyUpdated = true
}
// Drain the cache
globalNumWordsDelta.Set(globalDeltaCacheKey, uint(0), cache.NoExpiration)
globalNumSentencesDelta.Set(globalDeltaCacheKey, uint(0), cache.NoExpiration)
// Log the info
if actuallyUpdated {
log.Info("Successfully updated the global words/sentences count")
}
}
// updateGlobalWordSentencesDeltas updates sources' deltas of num_words and num_sentences
func updateSourcesWordSentencesDeltas() {
// whether we should print an update message at the end or not
actuallyUpdated := false
//l("Starting to update sources' words/sentences count")
// Update the word count
wordItems := sourcesNumWordsDelta.Items()
for k, v := range wordItems {
delta := v.Object.(uint)
if delta == 0 {
continue
}
if err := storage.UpdateSourceWordNum(k, delta); err != nil {
log.Error("failed updating source word count", err, log.Params{
"source": k,
})
continue
}
actuallyUpdated = true
sourcesNumWordsDelta.Set(k, uint(0), cache.NoExpiration)
}
// Update the sentences count
sentItems := sourcesNumSentencesDelta.Items()
for k, v := range sentItems {
delta := v.Object.(uint)
if delta == 0 {
continue
}
if err := storage.UpdateSourceSentNum(k, delta); err != nil {
log.Error("failed updating source sentences count", err, log.Params{
"source": k,
})
continue
}
actuallyUpdated = true
sourcesNumSentencesDelta.Set(k, uint(0), cache.NoExpiration)
}
// Log the info
if actuallyUpdated {
log.Info("Successfully update sources' words/sentences count")
}
}