From f410b5813b2a229dbf1104652e064a3d88aff0d7 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 8 Oct 2024 18:43:20 -0500 Subject: [PATCH 1/2] add new database package Add the Lexi DB package, which wraps badger DB to provide a simplified API, to capture the sometimes tedious mechanics that we've repeated in various places, and to add utilities for indexing data for quick retrieval of filtered data. This change is just the addition of the package. I will demonstrate it's use in tatanka separately, and also show how this can replace in our client/asset tx DB implementations. --- dex/lexi/datum.go | 86 ++++++++++++ dex/lexi/db_test.go | 216 ++++++++++++++++++++++++++++++ dex/lexi/dbid.go | 39 ++++++ dex/lexi/index.go | 303 ++++++++++++++++++++++++++++++++++++++++++ dex/lexi/keyprefix.go | 61 +++++++++ dex/lexi/lexi.go | 238 +++++++++++++++++++++++++++++++++ dex/lexi/log.go | 42 ++++++ dex/lexi/table.go | 191 ++++++++++++++++++++++++++ 8 files changed, 1176 insertions(+) create mode 100644 dex/lexi/datum.go create mode 100644 dex/lexi/db_test.go create mode 100644 dex/lexi/dbid.go create mode 100644 dex/lexi/index.go create mode 100644 dex/lexi/keyprefix.go create mode 100644 dex/lexi/lexi.go create mode 100644 dex/lexi/log.go create mode 100644 dex/lexi/table.go diff --git a/dex/lexi/datum.go b/dex/lexi/datum.go new file mode 100644 index 0000000000..1c84fbfde4 --- /dev/null +++ b/dex/lexi/datum.go @@ -0,0 +1,86 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "bytes" + "fmt" + + "github.com/decred/dcrd/wire" +) + +// datum is a value in the key-value database, along with information about +// its index entries. +type datum struct { + version byte + indexes [][]byte + v []byte +} + +func (d *datum) bytes() ([]byte, error) { + if d.version != 0 { + return nil, fmt.Errorf("unknown datum version %d", d.version) + } + bLen := 1 + len(d.v) + wire.VarIntSerializeSize(uint64(len(d.v))) + wire.VarIntSerializeSize(uint64(len(d.indexes))) + for _, ib := range d.indexes { + bLen += len(ib) + wire.VarIntSerializeSize(uint64(len(ib))) + } + b := bytes.NewBuffer(make([]byte, 0, bLen)) + if err := b.WriteByte(d.version); err != nil { + return nil, fmt.Errorf("error writing version: %w", err) + } + if err := wire.WriteVarInt(b, 0, uint64(len(d.indexes))); err != nil { + return nil, fmt.Errorf("error writing index count var int: %w", err) + } + for _, ib := range d.indexes { + if err := wire.WriteVarInt(b, 0, uint64(len(ib))); err != nil { + return nil, fmt.Errorf("error writing index var int: %w", err) + } + if _, err := b.Write(ib); err != nil { + return nil, fmt.Errorf("error writing index value: %w", err) + } + } + if err := wire.WriteVarInt(b, 0, uint64(len(d.v))); err != nil { + return nil, fmt.Errorf("error writing value var int: %w", err) + } + if _, err := b.Write(d.v); err != nil { + return nil, fmt.Errorf("error writing value: %w", err) + } + return b.Bytes(), nil +} + +func decodeDatum(blob []byte) (*datum, error) { + if len(blob) < 4 { + return nil, fmt.Errorf("datum blob length cannot be < 4. got %d", len(blob)) + } + d := &datum{version: blob[0]} + if d.version != 0 { + return nil, fmt.Errorf("unknown datum blob version %d", d.version) + } + b := bytes.NewBuffer(blob[1:]) + nIndexes, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("error reading number of indexes: %w", err) + } + d.indexes = make([][]byte, nIndexes) + for i := 0; i < int(nIndexes); i++ { + indexLen, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("error reading index length: %w", err) + } + d.indexes[i] = make([]byte, indexLen) + if _, err := b.Read(d.indexes[i]); err != nil { + return nil, fmt.Errorf("error reading index: %w", err) + } + } + valueLen, err := wire.ReadVarInt(b, 0) + if err != nil { + return nil, fmt.Errorf("erro reading value var int: %w", err) + } + d.v = make([]byte, valueLen) + if _, err := b.Read(d.v); err != nil { + return nil, fmt.Errorf("error reading value: %w", err) + } + return d, nil +} diff --git a/dex/lexi/db_test.go b/dex/lexi/db_test.go new file mode 100644 index 0000000000..f2614d17f1 --- /dev/null +++ b/dex/lexi/db_test.go @@ -0,0 +1,216 @@ +package lexi + +import ( + "bytes" + "encoding" + "os" + "path/filepath" + "strings" + "testing" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" +) + +func newTestDB(t *testing.T) (*DB, func()) { + tmpDir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("error making temp dir: %v", err) + } + db, err := New(&Config{ + Path: filepath.Join(tmpDir, "test.db"), + Log: dex.StdOutLogger("T", dex.LevelInfo), + }) + if err != nil { + t.Fatalf("error constructing db: %v", err) + } + return db, func() { os.RemoveAll(tmpDir) } +} + +func TestPrefixes(t *testing.T) { + db, shutdown := newTestDB(t) + defer shutdown() + + pfix, err := db.prefixForName("1") + if err != nil { + t.Fatalf("error getting prefix 1: %v", err) + } + if pfix != firstAvailablePrefix { + t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix) + } + + pfix, err = db.prefixForName("2") + if err != nil { + t.Fatalf("error getting prefix 2: %v", err) + } + if secondPfix := incrementPrefix(firstAvailablePrefix); pfix != secondPfix { + t.Fatalf("expected prefix %s, got %s", secondPfix, pfix) + } + + // Make sure requests for the same table name return the already-registered + // prefix. + pfix, err = db.prefixForName("1") + if err != nil { + t.Fatalf("error getting prefix 1 again: %v", err) + } + if pfix != firstAvailablePrefix { + t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix) + } +} + +type tValue struct { + k, v, idx []byte +} + +func (v *tValue) MarshalBinary() ([]byte, error) { + return v.v, nil +} + +func valueIndex(k, v encoding.BinaryMarshaler) ([]byte, error) { + return v.(*tValue).idx, nil +} + +func TestIndex(t *testing.T) { + db, shutdown := newTestDB(t) + defer shutdown() + + tbl, err := db.Table("T") + if err != nil { + t.Fatalf("Error creating table: %v", err) + } + + idx, err := tbl.AddIndex("I", valueIndex) + if err != nil { + t.Fatalf("Error adding index: %v", err) + } + + const nVs = 100 + vs := make([]*tValue, nVs) + for i := 0; i < nVs; i++ { + k := append(encode.RandomBytes(5), byte(i)) + v := &tValue{k: []byte{byte(i)}, v: encode.RandomBytes(10), idx: []byte{byte(i)}} + vs[i] = v + if err := tbl.Set(B(k), v); err != nil { + t.Fatalf("Error setting table entry: %v", err) + } + } + + // Iterate forwards. + var i int + idx.Iterate(nil, func(it *Iter) error { + v := vs[i] + it.V(func(vB []byte) error { + if !bytes.Equal(vB, v.v) { + t.Fatalf("Wrong bytes for forward iteration index %d", i) + } + return nil + }) + i++ + return nil + }) + if i != nVs { + t.Fatalf("Expected to iterate %d items but only did %d", nVs, i) + } + + // Iterate backwards + i = nVs + idx.Iterate(nil, func(it *Iter) error { + i-- + v := vs[i] + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, v.v) { + t.Fatalf("Wrong bytes for reverse iteration index %d", i) + } + return nil + }) + }, WithReverse()) + if i != 0 { + t.Fatalf("Expected to iterate back to zero but only got to %d", i) + } + + // Iterate forward and delete the first half. + i = 0 + if err := idx.Iterate(nil, func(it *Iter) error { + if i < 50 { + i++ + return it.Delete() + } + return ErrEndIteration + }, WithUpdate()); err != nil { + t.Fatalf("Error iterating forward to delete entries: %v", err) + } + if i != 50 { + t.Fatalf("Expected to iterate forward to 50, but only got to %d", i) + } + + idx.Iterate(nil, func(it *Iter) error { + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, vs[50].v) { + t.Fatal("Wrong first iteration item after deletion") + } + return ErrEndIteration + }) + }) + + // Seek a specific item. + i = 75 + idx.Iterate(nil, func(it *Iter) error { + if i == 75 { + i-- + return it.V(func(vB []byte) error { + if !bytes.Equal(vB, vs[75].v) { + t.Fatal("first item wasn't 25") + } + return nil + }) + } else if i == 74 { + return ErrEndIteration + } + t.Fatal("reached an unexpected value") + return nil + }, WithSeek(vs[75].idx), WithReverse()) + if i != 74 { + t.Fatal("never reached 74") + } +} + +func TestDatum(t *testing.T) { + testEncodeDecode := func(tag string, d *datum) { + t.Helper() + b, err := d.bytes() + if err != nil { + t.Fatalf("%s: error encoding simple datum: %v", tag, err) + } + reD, err := decodeDatum(b) + if err != nil { + t.Fatalf("%s: error decoding simple datum: %v", tag, err) + } + if !bytes.Equal(reD.v, d.v) { + t.Fatalf("%s: decoding datum value incorrect. %x != %x", tag, reD.v, d.v) + } + if d.version != 0 { + t.Fatalf("%s: wrong datum version. expected %d, got %d", tag, d.version, reD.version) + } + if len(d.indexes) != len(reD.indexes) { + t.Fatalf("%s: wrong number of indexes. wanted %d, got %d", tag, len(d.indexes), reD.indexes) + } + for i, idx := range d.indexes { + if !bytes.Equal(idx, reD.indexes[i]) { + t.Fatalf("%s: Wrong index # %d", tag, i) + } + } + } + + d := &datum{version: 1, v: []byte{0x01}} + if _, err := d.bytes(); err == nil || !strings.Contains(err.Error(), "unknown datum version") { + t.Fatalf("Wrong error for unknown datum version: %v", err) + } + d.version = 0 + + testEncodeDecode("simple", d) + + d = &datum{v: encode.RandomBytes(300)} + d.indexes = append(d.indexes, encode.RandomBytes(5)) + d.indexes = append(d.indexes, encode.RandomBytes(300)) + testEncodeDecode("complex", d) +} diff --git a/dex/lexi/dbid.go b/dex/lexi/dbid.go new file mode 100644 index 0000000000..a99d012dfa --- /dev/null +++ b/dex/lexi/dbid.go @@ -0,0 +1,39 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/hex" +) + +// DBIDSize is the size of the DBID. It is 8 bytes to match the size of a +// byte-encoded uint64. +const DBIDSize = 8 + +// DBID is a unique ID mapped to a datum's key. Keys can be any length, but to +// prevent long keys from being echoed in all the indexes, every key is +// translated to a DBID for internal use. +type DBID [DBIDSize]byte + +var ( + _ encoding.BinaryMarshaler = DBID{} + + lastDBID = DBID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} +) + +// MarshalBinary satisfies encoding.BinaryMarshaler for the DBID. +func (dbID DBID) MarshalBinary() ([]byte, error) { + return dbID[:], nil +} + +// String encodes the DBID as a 16-character hexadecimal string. +func (dbID DBID) String() string { + return hex.EncodeToString(dbID[:]) +} + +func newDBIDFromBytes(b []byte) (dbID DBID) { + copy(dbID[:], b) + return dbID +} diff --git a/dex/lexi/index.go b/dex/lexi/index.go new file mode 100644 index 0000000000..ed79d363ae --- /dev/null +++ b/dex/lexi/index.go @@ -0,0 +1,303 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/binary" + "errors" + "fmt" + "time" + + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +const ( + // ErrEndIteration can be returned from the function passed to Iterate + // to end iteration. No error will be returned from Iterate. + ErrEndIteration = dex.ErrorKind("end iteration") + // ErrDeleteEntry can be returned from the function passed to Iterate to + // trigger deletion of the datum, all of its index entries, and its key-id + // entries. + ErrDeleteEntry = dex.ErrorKind("delete entry") +) + +// Index is just a lexicographically-ordered list of byte slices. An Index is +// associated with a Table, and a datum inserted into a table can put entries +// into the Index. The Index can be iterated to view sorted data in the table. +type Index struct { + *DB + name string + table *Table + prefix keyPrefix + f func(k, v encoding.BinaryMarshaler) ([]byte, error) + defaultIterationOptions iteratorOpts +} + +// AddIndex adds an index to a Table. Once an Index is added, every datum +// inserted Set in the Table will generate an entry in the Index too. +func (t *Table) AddIndex(name string, f func(k, v encoding.BinaryMarshaler) ([]byte, error)) (*Index, error) { + p, err := t.prefixForName(t.name + "__idx__" + name) + if err != nil { + return nil, err + } + idx := &Index{ + DB: t.DB, + name: name, + table: t, + prefix: p, + f: f, + } + t.indexes = append(t.indexes, idx) + return idx, nil +} + +func (idx *Index) add(txn *badger.Txn, k, v encoding.BinaryMarshaler, dbID DBID) ([]byte, error) { + idxB, err := idx.f(k, v) + if err != nil { + return nil, fmt.Errorf("error getting index value: %w", err) + } + b := prefixedKey(idx.prefix, append(idxB, dbID[:]...)) + if err := txn.Set(b, nil); err != nil { + return nil, fmt.Errorf("error writing index entry: %w", err) + } + return b, nil +} + +type iteratorOpts struct { + update bool + reverse bool + seek []byte +} + +// IterationOption is a knob to change on Iterate runs on an Index. +type IterationOption func(opts *iteratorOpts) + +// WithUpdate must be used if the caller intends to make modifications during +// iteration, such as deleting elements. +func WithUpdate() IterationOption { + return func(opts *iteratorOpts) { + opts.update = true + } +} + +// WithReverse sets the direction of iteration to reverse-lexicographical. +func WithReverse() IterationOption { + return func(opts *iteratorOpts) { + opts.reverse = true + } +} + +// WithForward sets the direction of iteration to lexicographical. +func WithForward() IterationOption { + return func(opts *iteratorOpts) { + opts.reverse = false + } +} + +// WithSeek starts iteration at the specified prefix. +func WithSeek(prefix []byte) IterationOption { + return func(opts *iteratorOpts) { + opts.seek = prefix + } +} + +// UseDefaultIterationOptions sets default options for Iterate. +func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) { + for i := range optss { + optss[i](&idx.defaultIterationOptions) + } +} + +// Iter is an entry in the Index. The caller can use Iter to access and delete +// data associated with the index entry and it's datum. +type Iter struct { + idx *Index + item *badger.Item + txn *badger.Txn + dbID DBID + d *datum +} + +// V gives access to the datum bytes. The byte slice passed to f is only valid +// for the duration of the function call. The caller should make a copy if they +// intend to use the bytes outside of the scope of f. +func (i *Iter) V(f func(vB []byte) error) error { + d, err := i.datum() + if err != nil { + return err + } + return f(d.v) +} + +// K is the key for the datum. +func (i *Iter) K() (k []byte, _ error) { + item, err := i.txn.Get(prefixedKey(idToKeyPrefix, i.dbID[:])) + if err != nil { + return + } + return k, item.Value(func(kB []byte) error { + k = kB + return nil + }) +} + +// Entry is the actual index entry. These are the bytes returned by the +// generator passed to AddIndex. +func (i *Iter) Entry(f func(idxB []byte) error) error { + k := i.item.Key() + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("index entry too small. length = %d", len(k)) + } + return f(k[prefixSize : len(k)-DBIDSize]) +} + +func (i *Iter) datum() (_ *datum, err error) { + if i.d != nil { + return i.d, nil + } + k := i.item.Key() + if len(k) < prefixSize+DBIDSize { + return nil, fmt.Errorf("invalid index entry length %d", len(k)) + } + dbID := newDBIDFromBytes(k[len(k)-DBIDSize:]) + i.d, err = i.idx.table.get(i.txn, dbID) + return i.d, err +} + +// Delete deletes the indexed datum and any associated index entries. +func (i *Iter) Delete() error { + d, err := i.datum() + if err != nil { + return err + } + return i.idx.table.deleteDatum(i.txn, i.dbID, d) +} + +// IndexBucket is any one of a number of common types whose binary encoding is +// straight-forward. An IndexBucket restricts Iterate to the entries in the +// index that have the bytes decoded from the IndexBucket as the prefix. +type IndexBucket interface{} + +func parseIndexBucket(i IndexBucket) (b []byte, err error) { + switch it := i.(type) { + case []byte: + b = it + case uint32: + b = make([]byte, 4) + binary.BigEndian.PutUint32(b[:], it) + case time.Time: + b = make([]byte, 8) + binary.BigEndian.PutUint64(b[:], uint64(it.UnixMilli())) + case nil: + default: + err = fmt.Errorf("unknown IndexBucket type %T", it) + } + return +} + +// Iterate iterates the index, providing access to the index entry, datum, and +// datum key via the Iter. +func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + prefix, err := parseIndexBucket(prefixI) + if err != nil { + return err + } + io := idx.defaultIterationOptions + for i := range iterOpts { + iterOpts[i](&io) + } + iterFunc := iteratePrefix + if io.reverse { + iterFunc = reverseIteratePrefix + } + viewUpdate := idx.View + if io.update { + viewUpdate = idx.Update + } + var seek []byte + if len(io.seek) > 0 { + seek = prefixedKey(idx.prefix, io.seek) + } + return viewUpdate(func(txn *badger.Txn) error { + return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error { + item := iter.Item() + k := item.Key() + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("invalid index entry length %d", len(k)) + } + return f(&Iter{ + idx: idx, + item: iter.Item(), + txn: txn, + dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]), + }) + }) + }) +} + +type badgerIterationOption func(opts *badger.IteratorOptions) + +func withPrefetchSize(n int) badgerIterationOption { + return func(opts *badger.IteratorOptions) { + opts.PrefetchSize = n + } +} + +func iteratePrefix(txn *badger.Txn, prefix, seek []byte, f func(iter *badger.Iterator) error, iterOpts ...badgerIterationOption) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + for i := range iterOpts { + iterOpts[i](&opts) + } + iter := txn.NewIterator(opts) + defer iter.Close() + + if len(seek) == 0 { + iter.Rewind() + } else { + iter.Seek(seek) + } + + for ; iter.ValidForPrefix(prefix); iter.Next() { + if err := f(iter); err != nil { + if errors.Is(err, ErrEndIteration) { + return nil + } + return err + } + } + return nil +} + +func reverseIteratePrefix(txn *badger.Txn, prefix, seek []byte, f func(iter *badger.Iterator) error, iterOpts ...badgerIterationOption) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + opts.Reverse = true + for i := range iterOpts { + iterOpts[i](&opts) + } + iter := txn.NewIterator(opts) + defer iter.Close() + + if len(seek) == 0 { + var p keyPrefix + copy(p[:], prefix) + nextPrefix := incrementPrefix(p) + seek = nextPrefix[:] + } else { + seek = append(seek, lastDBID[:]...) + } + + for iter.Seek(seek); iter.ValidForPrefix(prefix); iter.Next() { + if err := f(iter); err != nil { + if errors.Is(err, ErrEndIteration) { + return nil + } + return err + } + } + return nil +} diff --git a/dex/lexi/keyprefix.go b/dex/lexi/keyprefix.go new file mode 100644 index 0000000000..ff7c85967b --- /dev/null +++ b/dex/lexi/keyprefix.go @@ -0,0 +1,61 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding/binary" + "encoding/hex" + + "github.com/dgraph-io/badger" +) + +const prefixSize = 2 + +// keyPrefix is a prefix for a key in the badger DB. Every table and index has +// a unique keyPrefix. This enables sorting and iteration of data. +type keyPrefix [prefixSize]byte + +func (p keyPrefix) String() string { + return hex.EncodeToString(p[:]) +} + +// NO RAW BADGER KEYS CAN BE LENGTH 2. IT CAN'T JUST BE A PREFIX, OR ELSE +// REVERSE ITERATION OF INDEXES FAILS + +var ( + // reserved prefixes + prefixToNamePrefix = keyPrefix{0x00, 0x00} + nameToPrefixPrefix = keyPrefix{0x00, 0x01} + primarySequencePrefix = keyPrefix{0x00, 0x02} + keyToIDPrefix = keyPrefix{0x00, 0x03} + idToKeyPrefix = keyPrefix{0x00, 0x04} + + firstAvailablePrefix = keyPrefix{0x01, 0x00} +) + +func incrementPrefix(prefix keyPrefix) (p keyPrefix) { + v := binary.BigEndian.Uint16(prefix[:]) + binary.BigEndian.PutUint16(p[:], v+1) + return p +} + +func bytesToPrefix(b []byte) (p keyPrefix) { + copy(p[:], b) + return +} + +func lastKeyForPrefix(txn *badger.Txn, p keyPrefix) (k []byte) { + reverseIteratePrefix(txn, p[:], nil, func(iter *badger.Iterator) error { + k = iter.Item().Key()[prefixSize:] + return ErrEndIteration + }, withPrefetchSize(1)) + return +} + +func prefixedKey(p keyPrefix, k []byte) []byte { + pk := make([]byte, prefixSize+len(k)) + copy(pk, p[:]) + copy(pk[prefixSize:], k) + return pk +} diff --git a/dex/lexi/lexi.go b/dex/lexi/lexi.go new file mode 100644 index 0000000000..99a56d7cc3 --- /dev/null +++ b/dex/lexi/lexi.go @@ -0,0 +1,238 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "context" + "encoding" + "encoding/binary" + "errors" + "fmt" + "sync" + "time" + + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +// ErrKeyNotFound is an alias for badger.ErrKeyNotFound so that the caller +// doesn't have to import badger to use the semantics. Either error will satisfy +// errors.Is the same. +var ErrKeyNotFound = badger.ErrKeyNotFound + +func convertError(err error) error { + switch { + case errors.Is(err, badger.ErrKeyNotFound): + return ErrKeyNotFound + } + return nil +} + +// DB is the Lexi DB. The Lexi DB wraps a badger key-value database and provides +// the ability to add indexed data. +type DB struct { + *badger.DB + log dex.Logger + idSeq *badger.Sequence + wg sync.WaitGroup + updateWG sync.WaitGroup +} + +// Config is the configuration settings for the Lexi DB. +type Config struct { + Path string + Log dex.Logger +} + +// New constructs a new Lexi DB. +func New(cfg *Config) (*DB, error) { + opts := badger.DefaultOptions(cfg.Path).WithLogger(&badgerLoggerWrapper{cfg.Log.SubLogger("BADG")}) + var err error + bdb, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + cfg.Log.Warnf("Error opening badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + cfg.Log.Warnf("Attempting to reopen badger DB with the Truncate option set...") + bdb, err = badger.Open(opts) + } + if err != nil { + return nil, err + } + idSeq, err := bdb.GetSequence(prefixedKey(primarySequencePrefix, []byte{0x00}), 1000) + if err != nil { + return nil, fmt.Errorf("error getting constructing primary sequence: %w", err) + } + + return &DB{ + DB: bdb, + log: cfg.Log, + idSeq: idSeq, + }, nil +} + +// Connect starts the DB, and creates goroutines to perform shutdown when the +// context is canceled. +func (db *DB) Connect(ctx context.Context) (*sync.WaitGroup, error) { + db.wg.Add(1) + go func() { + defer db.wg.Done() + <-ctx.Done() + if err := db.idSeq.Release(); err != nil { + db.log.Errorf("Error releasing sequence: %v", err) + } + }() + + db.wg.Add(1) + go func() { + defer db.wg.Done() + defer db.Close() + defer db.updateWG.Wait() + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := db.RunValueLogGC(0.5) + if err != nil && !errors.Is(err, badger.ErrNoRewrite) { + db.log.Errorf("garbage collection error: %v", err) + } + case <-ctx.Done(): + return + } + } + }() + + return &db.wg, nil +} + +// Update: badger can return an ErrConflict if a read and write happen +// concurrently. This bugs the hell out of me, because I though that if a +// database was ACID-compliant, this was impossible, but I guess not. Either +// way, the solution is to try again. +func (db *DB) Update(f func(txn *badger.Txn) error) (err error) { + db.updateWG.Add(1) + defer db.updateWG.Done() + + const maxRetries = 10 + sleepTime := 5 * time.Millisecond + + for i := 0; i < maxRetries; i++ { + if err = db.DB.Update(f); err == nil || !errors.Is(err, badger.ErrConflict) { + return err + } + sleepTime *= 2 + time.Sleep(sleepTime) + } + + return err +} + +// prefixForName returns a unique prefix for the provided name and logs the +// relationship in the DB. Repeated calls to prefixForName with the same name +// will return the same prefix, including through restarts. +func (db *DB) prefixForName(name string) (prefix keyPrefix, _ error) { + nameKey := prefixedKey(nameToPrefixPrefix, []byte(name)) + return prefix, db.Update(func(txn *badger.Txn) error { + it, err := txn.Get(nameKey) + if err == nil { + return it.Value(func(b []byte) error { + prefix = bytesToPrefix(b) + return nil + }) + } + if !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("error getting name: %w", err) + } + lastPrefix := lastKeyForPrefix(txn, prefixToNamePrefix) + if len(lastPrefix) == 0 { + prefix = firstAvailablePrefix + } else { + prefix = incrementPrefix(bytesToPrefix(lastPrefix)) + } + if err := txn.Set(prefixedKey(nameToPrefixPrefix, []byte(name)), prefix[:]); err != nil { + return fmt.Errorf("error setting prefix for table name: %w", err) + } + if err := txn.Set(prefixedKey(prefixToNamePrefix, prefix[:]), []byte(name)); err != nil { + return fmt.Errorf("error setting table name for prefix: %w", err) + } + return nil + }) +} + +func (db *DB) nextID() (dbID DBID, _ error) { + i, err := db.idSeq.Next() + if err != nil { + return dbID, err + } + binary.BigEndian.PutUint64(dbID[:], i) + return +} + +// KeyID returns the DBID for the key. This is the same DBID that will be used +// internally for the key when datum is inserted into a Table with Set. This +// method is provided as a tool to keep database index entries short. +func (db *DB) KeyID(kB []byte) (dbID DBID, err error) { + err = db.View(func(txn *badger.Txn) error { + dbID, err = db.keyID(txn, kB) + return err + }) + return +} + +func (db *DB) keyID(txn *badger.Txn, kB []byte) (dbID DBID, err error) { + item, err := txn.Get(prefixedKey(keyToIDPrefix, kB)) + if err == nil { + err = item.Value(func(v []byte) error { + copy(dbID[:], v) + return nil + }) + return + } + if errors.Is(err, ErrKeyNotFound) { + if dbID, err = db.nextID(); err != nil { + return + } + if err = txn.Set(prefixedKey(keyToIDPrefix, kB), dbID[:]); err != nil { + err = fmt.Errorf("error mapping key to ID: %w", err) + } else if err = txn.Set(prefixedKey(idToKeyPrefix, dbID[:]), kB); err != nil { + err = fmt.Errorf("error mapping ID to key: %w", err) + } + } + return +} + +// deleteDBID deletes the id-to-key mapping and the key-to-id mapping for the +// DBID. +func (db *DB) deleteDBID(txn *badger.Txn, dbID DBID) error { + idK := prefixedKey(idToKeyPrefix, dbID[:]) + item, err := txn.Get(idK) + if err != nil { + return convertError(err) + } + if err := item.Value(func(kB []byte) error { + if err := txn.Delete(prefixedKey(keyToIDPrefix, kB)); err != nil { + return fmt.Errorf("error deleting key to ID mapping: %w", err) + } + return nil + }); err != nil { + return err + } + if err := txn.Delete(idK); err != nil { + return fmt.Errorf("error deleting ID to key mapping: %w", err) + } + return nil +} + +// B is a byte slice that implements encoding.BinaryMarshaler. +type B []byte + +var _ encoding.BinaryMarshaler = B{} + +// MarshalBinary implements encoding.BinaryMarshaler for the B. +func (b B) MarshalBinary() ([]byte, error) { + return b, nil +} diff --git a/dex/lexi/log.go b/dex/lexi/log.go new file mode 100644 index 0000000000..414aa25843 --- /dev/null +++ b/dex/lexi/log.go @@ -0,0 +1,42 @@ +package lexi + +import ( + "decred.org/dcrdex/dex" + "github.com/dgraph-io/badger" +) + +// badgerLoggerWrapper wraps dex.Logger and translates Warnf to Warningf to +// satisfy badger.Logger. It also lowers the log level of Infof to Debugf +// and Debugf to Tracef. +type badgerLoggerWrapper struct { + dex.Logger +} + +var _ badger.Logger = (*badgerLoggerWrapper)(nil) + +// Debugf -> dex.Logger.Tracef +func (log *badgerLoggerWrapper) Debugf(s string, a ...interface{}) { + log.Tracef(s, a...) +} + +func (log *badgerLoggerWrapper) Debug(a ...interface{}) { + log.Trace(a...) +} + +// Infof -> dex.Logger.Debugf +func (log *badgerLoggerWrapper) Infof(s string, a ...interface{}) { + log.Debugf(s, a...) +} + +func (log *badgerLoggerWrapper) Info(a ...interface{}) { + log.Debug(a...) +} + +// Warningf -> dex.Logger.Warnf +func (log *badgerLoggerWrapper) Warningf(s string, a ...interface{}) { + log.Warnf(s, a...) +} + +func (log *badgerLoggerWrapper) Warning(a ...interface{}) { + log.Warn(a...) +} diff --git a/dex/lexi/table.go b/dex/lexi/table.go new file mode 100644 index 0000000000..742178d055 --- /dev/null +++ b/dex/lexi/table.go @@ -0,0 +1,191 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "errors" + "fmt" + + "github.com/dgraph-io/badger" +) + +// Table is a prefixed section of the k-v DB. A Table can have indexes, such +// that data inserted into the Table will generates index entries for use in +// lookup and iteration. +type Table struct { + *DB + name string + prefix keyPrefix + indexes []*Index + defaultSetOptions setOpts +} + +// Table constructs a new table in the DB. +func (db *DB) Table(name string) (*Table, error) { + p, err := db.prefixForName(name) + if err != nil { + return nil, err + } + return &Table{ + DB: db, + name: name, + prefix: p, + }, nil +} + +// UseDefaultSetOptions sets default options for Set. +func (t *Table) UseDefaultSetOptions(setOpts ...SetOption) { + for i := range setOpts { + setOpts[i](&t.defaultSetOptions) + } +} + +// Get retrieves a value from the Table. +func (t *Table) Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) error { + kB, err := k.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling key: %w", err) + } + return t.View(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + d, err := t.get(txn, dbID) + if err != nil { + return err + } + return v.UnmarshalBinary(d.v) + }) +} + +// func (t *Table) GetDBID(dbID DBID, v encoding.BinaryUnmarshaler) error { +// return t.View(func(txn *badger.Txn) error { +// d, err := t.get(txn, dbID) +// if err != nil { +// return err +// } +// return v.UnmarshalBinary(d.v) +// }) +// } + +func (t *Table) get(txn *badger.Txn, dbID DBID) (d *datum, err error) { + item, err := txn.Get(prefixedKey(t.prefix, dbID[:])) + if err != nil { + return nil, convertError(err) + } + err = item.Value(func(dB []byte) error { + d, err = decodeDatum(dB) + if err != nil { + return fmt.Errorf("error decoding datum: %w", err) + } + return nil + }) + return +} + +type setOpts struct { + replace bool +} + +// SetOptions is an knob to control how items are inserted into the table with +// Set. +type SetOption func(opts *setOpts) + +// WithReplace allows replacing pre-existing values when calling Set. +func WithReplace() SetOption { + return func(opts *setOpts) { + opts.replace = true + } +} + +// Set inserts a new value for the key, and creates index entries. +func (t *Table) Set(k, v encoding.BinaryMarshaler, setOpts ...SetOption) error { + kB, err := k.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling key: %w", err) + } + // zero length keys are not allowed because it screws up the reverse + // iteration scheme. + if len(kB) == 0 { + return errors.New("no zero-length keys allowed") + } + vB, err := v.MarshalBinary() + if err != nil { + return fmt.Errorf("error marshaling value: %w", err) + } + opts := t.defaultSetOptions + for i := range setOpts { + setOpts[i](&opts) + } + d := &datum{v: vB, indexes: make([][]byte, len(t.indexes))} + return t.Update(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + // See if an entry already exists + oldDatum, err := t.get(txn, dbID) + if !errors.Is(err, ErrKeyNotFound) { + if err != nil { + return fmt.Errorf("error looking for existing entry: %w", err) + } + // We found an old entry + if !opts.replace { + return errors.New("attempted to replace an entry without specifying WithReplace") + } + // Delete any old indexes + for _, k := range oldDatum.indexes { + if err := txn.Delete(k); err != nil { + return fmt.Errorf("error deleting replaced datum's index entry; %w", err) + } + } + } + for i, idx := range t.indexes { + if d.indexes[i], err = idx.add(txn, k, v, dbID); err != nil { + return fmt.Errorf("error adding entry to index %q: %w", idx.name, err) + } + } + dB, err := d.bytes() + if err != nil { + return fmt.Errorf("error encoding datum: %w", err) + } + return txn.Set(prefixedKey(t.prefix, dbID[:]), dB) + }) +} + +// Delete deletes the data associated with the key, including any index entries +// and the id<->key mappings. +func (t *Table) Delete(kB []byte) error { + return t.Update(func(txn *badger.Txn) error { + dbID, err := t.keyID(txn, kB) + if err != nil { + return convertError(err) + } + item, err := txn.Get(dbID[:]) + if err != nil { + return convertError(err) + } + return item.Value(func(dB []byte) error { + d, err := decodeDatum(dB) + if err != nil { + return fmt.Errorf("error decoding datum: %w", err) + } + return t.deleteDatum(txn, dbID, d) + }) + }) +} + +func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error { + for _, k := range d.indexes { + if err := txn.Delete(k); err != nil { + return fmt.Errorf("error deleting index entry; %w", err) + } + } + if err := txn.Delete(prefixedKey(t.prefix, dbID[:])); err != nil { + return fmt.Errorf("error deleting table entry: %w", err) + } + return t.deleteDBID(txn, dbID) +} From 4b750b42e3f793b305bad7716898cb880459519b Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Fri, 25 Oct 2024 08:39:25 -0500 Subject: [PATCH 2/2] fix some typos --- client/asset/eth/txdb.go | 6 +++--- dex/lexi/datum.go | 5 +++++ dex/lexi/index.go | 4 ++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/client/asset/eth/txdb.go b/client/asset/eth/txdb.go index a9bc6d1f73..60c06d0345 100644 --- a/client/asset/eth/txdb.go +++ b/client/asset/eth/txdb.go @@ -404,9 +404,9 @@ func (db *badgerTxDB) getTxs(n int, refID *common.Hash, past bool, tokenID *uint // getPendingTxs returns a map of nonce to extendedWalletTx for all // pending transactions. func (db *badgerTxDB) getPendingTxs() ([]*extendedWalletTx, error) { - // We will be iterating backwards from the most recent nonce. - // If we find numConfirmedTxsToCheck consecutive confirmed transactions, - // we can stop iterating. + // We will be iterating backwards from the most recent nonce. If we find + // numConfirmedTxsToCheck consecutive confirmed transactions, we can stop + // iterating. const numConfirmedTxsToCheck = 20 txs := make([]*extendedWalletTx, 0, 4) diff --git a/dex/lexi/datum.go b/dex/lexi/datum.go index 1c84fbfde4..54cbfdf388 100644 --- a/dex/lexi/datum.go +++ b/dex/lexi/datum.go @@ -22,6 +22,11 @@ func (d *datum) bytes() ([]byte, error) { if d.version != 0 { return nil, fmt.Errorf("unknown datum version %d", d.version) } + + // encoded datum length is 1 byte for version, 1 varint to say how many + // indexes there are then for each index, a varint to specify the size of + // the index entry followed by the entry itself, then a varint to specify + // the size of the value blob followed by the value blob itself. bLen := 1 + len(d.v) + wire.VarIntSerializeSize(uint64(len(d.v))) + wire.VarIntSerializeSize(uint64(len(d.indexes))) for _, ib := range d.indexes { bLen += len(ib) + wire.VarIntSerializeSize(uint64(len(ib))) diff --git a/dex/lexi/index.go b/dex/lexi/index.go index ed79d363ae..cdbbb8b515 100644 --- a/dex/lexi/index.go +++ b/dex/lexi/index.go @@ -37,7 +37,7 @@ type Index struct { } // AddIndex adds an index to a Table. Once an Index is added, every datum -// inserted Set in the Table will generate an entry in the Index too. +// Set in the Table will generate an entry in the Index too. func (t *Table) AddIndex(name string, f func(k, v encoding.BinaryMarshaler) ([]byte, error)) (*Index, error) { p, err := t.prefixForName(t.name + "__idx__" + name) if err != nil { @@ -72,7 +72,7 @@ type iteratorOpts struct { seek []byte } -// IterationOption is a knob to change on Iterate runs on an Index. +// IterationOption is a knob to change how Iterate runs on an Index. type IterationOption func(opts *iteratorOpts) // WithUpdate must be used if the caller intends to make modifications during