-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfeedstore.go
127 lines (99 loc) · 2.24 KB
/
feedstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package orbitdb
import (
"encoding/json"
"fmt"
"github.com/keks/go-ipfs-colog"
"github.com/keks/go-orbitdb/handler"
)
// ErrMalformedEntry is returned when a colog Entry does not have the expected
// format.
var ErrMalformedEntry = fmt.Errorf("read malformed colog entry")
type feedIndex eventIndex
func (idx *feedIndex) handleAdd(e *colog.Entry) error {
_, err := eventCast(e)
if err != nil {
return err
}
idx.l.Lock()
idx.added[e.Hash] = struct{}{}
idx.l.Unlock()
return nil
}
func (idx *feedIndex) handleDel(e *colog.Entry) error {
pl, err := eventCast(e)
if err != nil {
return err
}
idx.l.Lock()
delete(idx.added, colog.Hash(pl.Event().GetString()))
idx.l.Unlock()
return nil
}
func (idx *feedIndex) has(hash colog.Hash) bool {
idx.l.Lock()
defer idx.l.Unlock()
_, has := idx.added[hash]
return has
}
// FeedStore is similar to an EventStore but also allows deleting events.
type FeedStore struct {
db *OrbitDB
idx *feedIndex
}
// NewFeedStore returns a FeedStore for the given OrbitDB.
func NewFeedStore(db *OrbitDB) *FeedStore {
fs := &FeedStore{
db: db,
idx: &feedIndex{
added: make(map[colog.Hash]struct{}),
},
}
mux := handler.NewMux()
mux.AddHandler(OpAdd, fs.idx.handleAdd)
mux.AddHandler(OpDel, fs.idx.handleDel)
go db.Notify(mux)
return fs
}
// Delete deletes the event at the given hash.
func (fs *FeedStore) Delete(hash colog.Hash) (*colog.Entry, error) {
jsonHash, err := json.Marshal(hash)
if err != nil {
return nil, err
}
payload := eventPayload{
Op: OpDel,
Data: jsonHash,
}
return fs.db.Add(&payload)
}
// Add adds a new Event.
func (fs *FeedStore) Add(data interface{}) (*colog.Entry, error) {
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
payload := eventPayload{
Op: OpAdd,
Data: jsonData,
}
return fs.db.Add(&payload)
}
// Query queries the events using a given query qry. It omits deleted events.
func (fs *FeedStore) Query(qry colog.Query) EventResult {
res := fs.db.colog.Query(qry)
return func() (Event, error) {
for {
e, err := res()
if err != nil {
return Event{}, err
}
if fs.idx.has(e.Hash) {
pl, err := eventCast(e)
if err != nil {
return Event{}, err
}
return pl.Event(), nil
}
}
}
}