Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init: fast query DB #229

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"os"
"path/filepath"

"golang.org/x/exp/slices"

// #nosec G702
"github.com/prometheus/client_golang/prometheus"

"github.com/cosmos/cosmos-sdk/store/streaming"
authsims "github.com/cosmos/cosmos-sdk/x/auth/simulation"

wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"
fastquery "github.com/terra-money/core/v2/app/fast_query"
"github.com/terra-money/core/v2/app/keepers"
"github.com/terra-money/core/v2/app/post"
"github.com/terra-money/core/v2/app/rpc"
Expand Down Expand Up @@ -242,11 +246,28 @@ func NewTerraApp(
app.mm.RegisterInvariants(&app.Keepers.CrisisKeeper)
app.mm.RegisterServices(app.configurator)

// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.BaseApp, appOpts, app.appCodec, app.Logger(), app.keys); err != nil {
panic(err)
}

// initialize stores
app.MountKVStores(app.keys)
app.MountTransientStores(app.tkeys)
app.MountMemoryStores(app.memKeys)

// when fastquery streamer is enabled in the config,
// setup the fastquery feature and serve the data
// from the fastquery.
// TODO: move checking if streaming service is enabled to a helper function
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
if slices.Contains(streamers, "fastquery") {
err := app.SetupFastQueryDB(appOpts, homePath)
if err != nil {
panic(err)
}
}

// register upgrade
app.RegisterUpgradeHandlers()
app.RegisterUpgradeStores()
Expand Down Expand Up @@ -568,3 +589,25 @@ func (app *TerraApp) GetWasmOpts(appOpts servertypes.AppOptions) []wasmkeeper.Op

return wasmOpts
}

func (app *TerraApp) SetupFastQueryDB(appOpts servertypes.AppOptions, homePath string) error {
// Create the path for fastquerydb
dir := filepath.Join(homePath, "data")

// Create fast query serice
fastQueryService, err := fastquery.NewFastQueryService(dir, app.Logger(), app.keys)
if err != nil {
return err
}

// Create the streaming service
streamingservice := fastquery.NewStreamingService(fastQueryService, app.keys)

// Assign the streaming service to the app and
// the query multi store so the users query the
// data in a faster way.
app.SetStreamingService(streamingservice)
app.SetQueryMultiStore(fastQueryService.Store)

return nil
}
84 changes: 84 additions & 0 deletions app/fast_query/db/driver/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package driver

import (
"fmt"

tmdb "github.com/cometbft/cometbft-db"
"github.com/terra-money/core/v2/app/fast_query/db/height_driver"
)

var _ height_driver.HeightEnabledBatch = (*DriverBatch)(nil)
var _ HasRollbackBatch = (*DriverBatch)(nil)

type DriverBatch struct {
height int64
batch *RollbackableBatch
mode int
}

func (b *DriverBatch) keyBytesWithHeight(key []byte) []byte {
return append(prefixDataWithHeightKey(key), serializeHeight(b.mode, b.height)...)
}

func NewLevelDBBatch(atHeight int64, dbDriver *DBDriver) *DriverBatch {
return &DriverBatch{
height: atHeight,
batch: NewRollbackableBatch(dbDriver.session),
mode: dbDriver.mode,
}
}

func (b *DriverBatch) Set(key, value []byte) error {
newKey := b.keyBytesWithHeight(key)

// make fixed size byte slice for performance
buf := make([]byte, 0, len(value)+1)
buf = append(buf, byte(0)) // 0 => not deleted
buf = append(buf, value...)

if err := b.batch.Set(prefixCurrentDataKey(key), buf[1:]); err != nil {
return err
}
if err := b.batch.Set(prefixKeysForIteratorKey(key), []byte{}); err != nil {
return err
}
return b.batch.Set(newKey, buf)
}

func (b *DriverBatch) Delete(key []byte) error {
newKey := b.keyBytesWithHeight(key)

buf := []byte{1}

if err := b.batch.Delete(prefixCurrentDataKey(key)); err != nil {
return err
}
if err := b.batch.Set(prefixKeysForIteratorKey(key), buf); err != nil {
return err
}
return b.batch.Set(newKey, buf)
}

func (b *DriverBatch) Write() error {
return b.batch.Write()
}

func (b *DriverBatch) WriteSync() error {
return b.batch.WriteSync()
}

func (b *DriverBatch) Close() error {
return b.batch.Close()
}

func (b *DriverBatch) RollbackBatch() tmdb.Batch {
b.Metric()
return b.batch.RollbackBatch
}

func (b *DriverBatch) Metric() {
fmt.Printf("[rollback-batch] rollback batch for height %d's record length %d\n",
b.height,
b.batch.RecordCount,
)
}
155 changes: 155 additions & 0 deletions app/fast_query/db/driver/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package driver

import (
"fmt"
"math"

dbm "github.com/cometbft/cometbft-db"
"github.com/terra-money/core/v2/app/fast_query/db/height_driver"
"github.com/terra-money/core/v2/app/fast_query/db/utils"
)

type DBDriver struct {
session *dbm.GoLevelDB
mode int
}

func NewDBDriver(dir string) (*DBDriver, error) {
ldb, err := dbm.NewGoLevelDB(DBName, dir)
if err != nil {
return nil, err
}

return &DBDriver{
session: ldb,
mode: DriverModeKeySuffixDesc,
}, nil
}

func (dbDriver *DBDriver) newInnerIterator(requestHeight int64, pdb *dbm.PrefixDB) (dbm.Iterator, error) {
if dbDriver.mode == DriverModeKeySuffixAsc {
heightEnd := utils.UintToBigEndian(uint64(requestHeight + 1))
return pdb.ReverseIterator(nil, heightEnd)
} else {
heightStart := utils.UintToBigEndian(math.MaxUint64 - uint64(requestHeight))
return pdb.Iterator(heightStart, nil)
}
}

func (dbDriver *DBDriver) Get(maxHeight int64, key []byte) ([]byte, error) {
if maxHeight == 0 {
return dbDriver.session.Get(prefixCurrentDataKey(key))
}
var requestHeight = height_driver.Height(maxHeight).CurrentOrLatest().ToInt64()
var requestHeightMin = height_driver.Height(0).CurrentOrNever().ToInt64()

// check if requestHeightMin is
if requestHeightMin > requestHeight {
return nil, fmt.Errorf("invalid height")
}

pdb := dbm.NewPrefixDB(dbDriver.session, prefixDataWithHeightKey(key))

iter, _ := dbDriver.newInnerIterator(requestHeight, pdb)
defer iter.Close()

// in [email protected], key not found is NOT an error
if !iter.Valid() {
return nil, nil
}

value := iter.Value()
deleted := value[0]
if deleted == 1 {
return nil, nil
} else {
if len(value) > 1 {
return value[1:], nil
}
return []byte{}, nil
}
}

func (dbDriver *DBDriver) Has(maxHeight int64, key []byte) (bool, error) {
if maxHeight == 0 {
return dbDriver.session.Has(prefixCurrentDataKey(key))
}
var requestHeight = height_driver.Height(maxHeight).CurrentOrLatest().ToInt64()
var requestHeightMin = height_driver.Height(0).CurrentOrNever().ToInt64()

// check if requestHeightMin is
if requestHeightMin > requestHeight {
return false, fmt.Errorf("invalid height")
}

pdb := dbm.NewPrefixDB(dbDriver.session, prefixDataWithHeightKey(key))

iter, _ := dbDriver.newInnerIterator(requestHeight, pdb)
defer iter.Close()

// in [email protected], key not found is NOT an error
if !iter.Valid() {
return false, nil
}

deleted := iter.Value()[0]

if deleted == 1 {
return false, nil
} else {
return true, nil
}
}

func (dbDriver *DBDriver) Set(atHeight int64, key, value []byte) error {
// should never reach here, all should be batched in tiered+hld
panic("should never reach here")
}

func (dbDriver *DBDriver) SetSync(atHeight int64, key, value []byte) error {
// should never reach here, all should be batched in tiered+hld
panic("should never reach here")
}

func (dbDriver *DBDriver) Delete(atHeight int64, key []byte) error {
// should never reach here, all should be batched in tiered+hld
panic("should never reach here")
}

func (dbDriver *DBDriver) DeleteSync(atHeight int64, key []byte) error {
return dbDriver.Delete(atHeight, key)
}

func (dbDriver *DBDriver) Iterator(maxHeight int64, start, end []byte) (height_driver.HeightEnabledIterator, error) {
if maxHeight == 0 {
pdb := dbm.NewPrefixDB(dbDriver.session, cCurrentDataPrefix)
return pdb.Iterator(start, end)
}
return NewLevelDBIterator(dbDriver, maxHeight, start, end)
}

func (dbDriver *DBDriver) ReverseIterator(maxHeight int64, start, end []byte) (height_driver.HeightEnabledIterator, error) {
if maxHeight == 0 {
pdb := dbm.NewPrefixDB(dbDriver.session, cCurrentDataPrefix)
return pdb.ReverseIterator(start, end)
}
return NewLevelDBReverseIterator(dbDriver, maxHeight, start, end)
}

func (dbDriver *DBDriver) Close() error {
dbDriver.session.Close()
return nil
}

func (dbDriver *DBDriver) NewBatch(atHeight int64) height_driver.HeightEnabledBatch {
return NewLevelDBBatch(atHeight, dbDriver)
}

// TODO: Implement me
func (dbDriver *DBDriver) Print() error {
return nil
}

func (dbDriver *DBDriver) Stats() map[string]string {
return nil
}
Loading
Loading