-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
110 lines (95 loc) · 3.24 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
var pull = require('pull-stream')
var Write = require('pull-write')
var Obv = require('obv')
module.exports = function (version, map) {
var since = Obv()
var env = typeof window == 'object' ? window : self;
return function (log, name) {
var db
function initialize () {
var req = env.indexedDB.open(log.filename+'/'+name, 1)
req.onsuccess = function (ev) {
console.log('opened', name, ev)
db = ev.target.result
db.transaction(['meta'],'readonly').objectStore('meta')
.get('meta')
.onsuccess = function (ev) {
console.log('loaded', name, ev.target.result)
if(!ev.target.result) since.set(-1)
else if(ev.target.result.version === version) {
since.set(ev.target.result.since)
} else {
var tx = db.transaction(['meta', 'data'])
tx.deleteDatabase('meta')
tx.deleteDatabase('data')
tx.onversionchange = initialize
}
}
}
req.onupgradeneeded = function (ev) {
console.log('upgrade needed')
db = ev.target.result
db.createObjectStore('meta')
db.createObjectStore('data')
}
req.onerror = function (ev) {
console.log('ERROR')
throw new Error('could not load indexdb:'+dir)
}
}
initialize()
since(function (v) {
console.log("UPDATE sINCE", v)
})
return {
since: since,
methods: {get: 'async', read: 'source'},
createSink: function (cb) {
return Write(function (batch, cb) {
var tx = db.transaction(['meta', 'data'], 'readwrite')
tx.onabort = tx.onerror = function (err) { cb(err || error) }
function onError (_err) {
error = _err
tx.abort()
}
tx.oncomplete = function () {
console.log("WRITTEN", batch)
since.set(batch.meta.since)
cb(null, batch.meta.since)
}
tx.objectStore('meta').put(batch.meta, 'meta').onerror = onError
var dataStore = tx.objectStore('data')
batch.data.forEach(function (data) {
console.log('batch', data)
dataStore.put(data.value, data.key).onerror = onError
})
}, function (batch, data) {
console.log("Reduce", batch, data)
if(!batch) batch = {meta: {version: version, since: data.seq}, data: []}
var index = map(data.value, data.seq)
if(index)
batch.data.push()
var indexed = map(data.value, data.seq)
batch.data = batch.data.concat(indexed.map(function (key) { return { key: key, value: data.seq}}))
//batch[.value.since = Math.max(batch[0].value.since, data.seq)
batch.meta.since = data.seq
return batch
})
},
get: function (key, cb) {
var tx = db.transaction(['data'], 'readonly')
var req = tx.objectStore('data').get(key)
console.log('get', key)
req.onsuccess = function (ev) {
log.get(ev.target.result, cb)
}
req.onerror = function () {
cb(new Error('key not found:'+key))
}
},
read: function () {
throw new Error('not yet implemented')
}
}
}
}