-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
130 lines (117 loc) · 3.61 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
var Obv = require('obv')
var Drain = require('pull-stream/sinks/drain')
var Once = require('pull-stream/sources/once')
var AtomicFile = require('atomic-file')
var path = require('path')
var jsbloom = require('jsbloom')
function isEmpty (o) {
for(var k in o) return false
return true
}
function isFunction (f) {
return 'function' === typeof f
}
function isString (s) {
return 'string' === typeof s
}
var codec = require('./codec')
function lookup(map) {
if(isFunction(map)) return map
else if(isString(map)) return function (obj) { return obj[map] }
else return function (e) { return e }
}
module.exports = function (version, map, opts) {
opts = opts || {}
var items = opts.items || 100e3
var probability = opts.probability || 0.001
if(isFunction(version))
throw new Error('version must be a number')
map = lookup(map)
return function (log, name) { //name is where this view is mounted
var acc, since = Obv(), ts = 0
var value = Obv(), _value, writing = false, state, int
function write () {
var _ts = Date.now()
if(state && since.value === log.since.value && _ts > ts + 60*1000 && !writing) {
clearTimeout(int)
int = setTimeout(function () {
ts = _ts; writing = true
state.set({
seq: since.value,
version: version,
items: items,
probability: probability,
value: bloom
}, function () {
writing = false
})
}, 200)
}
}
//depending on the function, the reduction may not change on every update.
//but currently, we still need to rewrite the file to reflect that.
//(or accept that we'll have to reprocess some items)
//might be good to have a cheap way to update the seq. maybe put it in the filename,
//so filenames monotonically increase, instead of write to `name~` and then `mv name~ name`
if(log.filename) {
var dir = path.dirname(log.filename)
state = AtomicFile(path.join(dir, name+'.json'), codec)
state.get(function (err, data) {
if(err || isEmpty(data)) {
bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001)
since.set(-1)
}
else if( //if any settings have changed, reinitialize the filter.
data.version !== version
|| data.items !== opts.items
|| data.probabilty !== opts.probability
) {
bloom = jsbloom.filter(opts.items || 100e3, opts.probability || 0.001)
since.set(-1) //overwrite old data.
}
else {
bloom = data.value
since.set(data.seq)
}
})
}
else
since.set(-1)
return {
since: since,
value: value,
methods: {has: 'sync'},
//has checks immediately, but if you want to wait
//use db[name].ready(function () { db[name].has(key) })
//ready is added by flumedb
has: function (key) {
return bloom.checkEntry(key)
},
createSink: function (cb) {
return Drain(function (data) {
var key = map(data.value, data.seq)
if(key) bloom.addEntry(key)
since.set(data.seq)
write()
}, cb)
},
destroy: function (cb) {
bloom = null; since.set(-1);
if(state) state.set(null, cb)
else cb()
},
close: function (cb) {
clearTimeout(int)
if(!since.value) return cb()
//force a write.
state.set({
seq: since.value,
version: version,
items: opts.items,
probability: opts.probability,
value: bloom
}, cb)
}
}
}
}