-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
113 lines (89 loc) · 2.63 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
'use strict'
var LRU = require('hashlru')
var Obv = require('obv')
var path = require('path')
var createKV = require('flume-kv/level')
var pull = require('pull-stream')
function isString (s) { return 'string' === typeof s }
var isArray = Array.isArray
module.exports = function (version, map) {
return function (log, name) {
var kv = createKV(path.join(path.dirname(log.filename), name))
var cache = LRU(1000)
function get(key, cb) {
var v
//take value from the cache, or from the writing data.
//can't trust the cache because if there are lots of reads something may have been purged!
if((v = cache.get(key)) != null || (v = writing[key]) != null || (v = _writing[key]) != null)
cb(null, v)
else if(cbs[key])
cbs[key].push(cb)
else {
cbs[key] = [cb]
kv.get(key, function (err, v) {
if(!err) cache.set(key, v)
while(cbs.length)
cbs.shift()(err, v)
})
}
}
var since = Obv(), writing = {}, keys = 0
function add(key, value) {
if(!writing[key])
keys ++
cache.set(key, writing[key] = value)
}
kv.load(function (err, v) {
since.set(err ? -1 : v)
})
//id like to modularize stuff like this, but it always seems
//to have special things relevant to the specific case.
var write = (function () {
var ts = 0, isWriting = false, timer
function __write (cb) {
var _writing = writing; keys = 0; writing = {}
kv.batch(_writing, {version: version, since: since.value}, cb)
}
function _write () {
isWriting = true
var _since = since.value
__write(function () {
isWriting = false
//if we have new data, maybe write again?
if(since.value > _since) write()
})
}
return function () {
if(isWriting) return
var _ts = Date.now()
if(keys > 500)
_write()
else if (
since.value === log.since.value &&
_ts > ts + 60*1000
) {
clearTimeout(timer)
timer = setTimeout(_write, 200)
}
}
})()
return {
methods: {get: 'async'},
get: get,
since: since,
createSink: function (cb) {
return pull(
pull.drain(function (data) {
var indexes = map(data.value, data.seq)
if(isString(indexes))
add(indexes, data.seq)
else if (isArray(indexes))
indexes.forEach(function (key) { add(key, data.seq) })
since.set(data.seq)
write()
})
)
}
}
}
}