Skip to content

Commit

Permalink
vochain/indexer: add support for backups
Browse files Browse the repository at this point in the history
A new SaveBackup method which creates a backup to a file on disk,
and a new RestoreBackup method which can restore them.
Note that the restoring can only happen shortly after calling New,
since otherwise any indexing or queries happening at the same time
can easily lead to bad data since they would race to lock the database.

This only implements backing up and restoring from a file on disk.
Future changes will integrate this with the rest of the codebase,
such as taking regular backups or listing the stored backups.

Updates #1062.
  • Loading branch information
mvdan authored and p4u committed Dec 11, 2023
1 parent 551829d commit 01b13f8
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 12 deletions.
88 changes: 76 additions & 12 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"embed"
"encoding/hex"
"fmt"
"io"
"math/big"
"os"
"path/filepath"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Indexer struct {
// TODO: try using blockTx directly, after some more refactors?
votePool map[string]map[string]*state.Vote

dbPath string
readOnlyDB *sql.DB
readWriteDB *sql.DB

Expand Down Expand Up @@ -97,6 +99,10 @@ type Indexer struct {
type Options struct {
DataDir string

// ExpectBackupRestore should be set to true if a call to Indexer.RestoreBackup
// will be made shortly after New is called, and before any indexing or queries happen.
ExpectBackupRestore bool

IgnoreLiveResults bool
}

Expand All @@ -122,25 +128,42 @@ func New(app *vochain.BaseApplication, opts Options) (*Indexer, error) {
if err := os.MkdirAll(opts.DataDir, os.ModePerm); err != nil {
return nil, err
}
dbPath := filepath.Join(opts.DataDir, "db.sqlite")
idx.dbPath = filepath.Join(opts.DataDir, "db.sqlite")

// If we are expecting a restore shortly after, that will initialize the DB.
if !opts.ExpectBackupRestore {
idx.startDB()
}

// Subscribe to events
idx.App.State.AddEventListener(idx)

return idx, nil
}

func (idx *Indexer) startDB() error {
if idx.readWriteDB != nil {
panic("Indexer.startDB called twice")
}

var err error

// sqlite doesn't support multiple concurrent writers.
// For that reason, readWriteDB is limited to one open connection.
// Per https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1067353980,
// we use WAL to allow multiple concurrent readers at the same time.
idx.readWriteDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", dbPath))
idx.readWriteDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", idx.dbPath))
if err != nil {
return nil, err
return err
}
idx.readWriteDB.SetMaxOpenConns(1)
idx.readWriteDB.SetMaxIdleConns(2)
idx.readWriteDB.SetConnMaxIdleTime(10 * time.Minute)
idx.readWriteDB.SetConnMaxLifetime(time.Hour)

idx.readOnlyDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", dbPath))
idx.readOnlyDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", idx.dbPath))
if err != nil {
return nil, err
return err
}
// Increasing these numbers can allow for more queries to run concurrently,
// but it also increases the memory used by sqlite and our connection pool.
Expand All @@ -151,27 +174,42 @@ func New(app *vochain.BaseApplication, opts Options) (*Indexer, error) {
idx.readOnlyDB.SetConnMaxLifetime(time.Hour)

if err := goose.SetDialect("sqlite3"); err != nil {
return nil, err
return err
}
goose.SetLogger(log.GooseLogger())
goose.SetBaseFS(embedMigrations)
if err := goose.Up(idx.readWriteDB, "migrations"); err != nil {
return nil, fmt.Errorf("goose up: %w", err)
return fmt.Errorf("goose up: %w", err)
}

idx.readOnlyQuery, err = indexerdb.Prepare(context.TODO(), idx.readOnlyDB)
if err != nil {
return nil, err
return err
}
idx.blockQueries, err = indexerdb.Prepare(context.TODO(), idx.readWriteDB)
if err != nil {
panic(err)
return err
}
return nil
}

// Subscribe to events
idx.App.State.AddEventListener(idx)
func copyFile(dst, src string) error {
srcf, err := os.Open(src)
if err != nil {
return err
}
defer srcf.Close()

return idx, nil
// For now, we don't care about permissions
dstf, err := os.Create(dst)
if err != nil {
return err
}
_, err = io.Copy(dstf, srcf)
if err2 := dstf.Close(); err == nil {
err = err2
}
return err
}

func (idx *Indexer) Close() error {
Expand All @@ -184,6 +222,32 @@ func (idx *Indexer) Close() error {
return nil
}

// BackupPath restores the database from a backup created via SaveBackup.
// Note that this must be called with ExpectBackupRestore set to true,
// and before any indexing or queries happen.
func (idx *Indexer) RestoreBackup(ctx context.Context, path string) error {
if idx.readWriteDB != nil {
panic("Indexer.RestoreBackup called after the database was initialized")
}
if err := copyFile(idx.dbPath, path); err != nil {
return fmt.Errorf("could not restore indexer backup: %w", err)
}
if err := idx.startDB(); err != nil {
return err
}
return nil
}

// SaveBackup backs up the database to a file on disk.
// Note that writes to the database may be blocked until the backup finishes,
// and an error may occur if a file at path already exists.
//
// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database.
func (idx *Indexer) SaveBackup(ctx context.Context, path string) error {
_, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path)
return err
}

// blockTxQueries assumes that lockPool is locked.
func (idx *Indexer) blockTxQueries() *indexerdb.Queries {
if idx.blockMu.TryLock() {
Expand Down
75 changes: 75 additions & 0 deletions vochain/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package indexer

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
stdlog "log"
"math/big"
"path/filepath"
"testing"

qt "github.com/frankban/quicktest"
Expand Down Expand Up @@ -43,6 +45,79 @@ func newTestIndexer(tb testing.TB, app *vochain.BaseApplication) *Indexer {
return idx
}

func TestBackup(t *testing.T) {
app := vochain.TestBaseApplication(t)

idx, err := New(app, Options{DataDir: t.TempDir()})
qt.Assert(t, err, qt.IsNil)

wantTotalVotes := func(want uint64) {
got, err := idx.CountTotalVotes()
qt.Assert(t, err, qt.IsNil)
qt.Assert(t, got, qt.Equals, want)
}

vp, err := state.NewVotePackage([]int{1, 1, 1}).Encode()
qt.Assert(t, err, qt.IsNil)

// A new indexer has no votes.
wantTotalVotes(0)

// Add 10 votes and check they are counted.
pid := util.RandomBytes(32)
err = app.State.AddProcess(&models.Process{
ProcessId: pid,
EnvelopeType: &models.EnvelopeType{EncryptedVotes: false},
Status: models.ProcessStatus_READY,
Mode: &models.ProcessMode{AutoStart: true},
BlockCount: 10,
MaxCensusSize: 1000,
VoteOptions: &models.ProcessVoteOptions{
MaxCount: 5,
MaxValue: 1,
MaxTotalCost: 3,
CostExponent: 1,
},
})
qt.Assert(t, err, qt.IsNil)
for i := 0; i < 10; i++ {
v := &state.Vote{ProcessID: pid, VotePackage: vp, Nullifier: util.RandomBytes(32)}
qt.Assert(t, app.State.AddVote(v), qt.IsNil)
}
app.AdvanceTestBlock()
wantTotalVotes(10)

// Back up the database.
backupPath := filepath.Join(t.TempDir(), "backup")
err = idx.SaveBackup(context.TODO(), backupPath)
qt.Assert(t, err, qt.IsNil)

// Add another 5 votes which aren't in the backup.
for i := 0; i < 5; i++ {
v := &state.Vote{ProcessID: pid, VotePackage: vp, Nullifier: util.RandomBytes(32)}
qt.Assert(t, app.State.AddVote(v), qt.IsNil)
}
app.AdvanceTestBlock()
wantTotalVotes(15)

// Starting a new database without the backup should see zero votes.
idx.Close()
idx, err = New(app, Options{DataDir: t.TempDir()})
qt.Assert(t, err, qt.IsNil)
wantTotalVotes(0)

// Starting a new database with the backup should see the votes from the backup.
idx.Close()
idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true})
qt.Assert(t, err, qt.IsNil)
err = idx.RestoreBackup(context.TODO(), backupPath)
qt.Assert(t, err, qt.IsNil)
wantTotalVotes(10)

// Close the last indexer.
idx.Close()
}

func TestEntityList(t *testing.T) {
for _, count := range []int{2, 100, 155} {
t.Run(fmt.Sprintf("count=%03d", count), func(t *testing.T) {
Expand Down

0 comments on commit 01b13f8

Please sign in to comment.