diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index 59a795a92..74da19a67 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -6,6 +6,7 @@ import ( "embed" "encoding/hex" "fmt" + "io" "math/big" "os" "path/filepath" @@ -62,6 +63,7 @@ type Indexer struct { // TODO: try using blockTx directly, after some more refactors? votePool map[string]map[string]*state.Vote + dbPath string readOnlyDB *sql.DB readWriteDB *sql.DB @@ -97,6 +99,10 @@ type Indexer struct { type Options struct { DataDir string + // ExpectBackupRestore should be set to true if a call to Indexer.RestoreBackup + // will be made shortly after New is called, and before any indexing or queries happen. + ExpectBackupRestore bool + IgnoreLiveResults bool } @@ -122,25 +128,42 @@ func New(app *vochain.BaseApplication, opts Options) (*Indexer, error) { if err := os.MkdirAll(opts.DataDir, os.ModePerm); err != nil { return nil, err } - dbPath := filepath.Join(opts.DataDir, "db.sqlite") + idx.dbPath = filepath.Join(opts.DataDir, "db.sqlite") + + // If we are expecting a restore shortly after, that will initialize the DB. + if !opts.ExpectBackupRestore { + idx.startDB() + } + + // Subscribe to events + idx.App.State.AddEventListener(idx) + + return idx, nil +} + +func (idx *Indexer) startDB() error { + if idx.readWriteDB != nil { + panic("Indexer.startDB called twice") + } + var err error // sqlite doesn't support multiple concurrent writers. // For that reason, readWriteDB is limited to one open connection. // Per https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1067353980, // we use WAL to allow multiple concurrent readers at the same time. - idx.readWriteDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", dbPath)) + idx.readWriteDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", idx.dbPath)) if err != nil { - return nil, err + return err } idx.readWriteDB.SetMaxOpenConns(1) idx.readWriteDB.SetMaxIdleConns(2) idx.readWriteDB.SetConnMaxIdleTime(10 * time.Minute) idx.readWriteDB.SetConnMaxLifetime(time.Hour) - idx.readOnlyDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", dbPath)) + idx.readOnlyDB, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", idx.dbPath)) if err != nil { - return nil, err + return err } // Increasing these numbers can allow for more queries to run concurrently, // but it also increases the memory used by sqlite and our connection pool. @@ -151,27 +174,42 @@ func New(app *vochain.BaseApplication, opts Options) (*Indexer, error) { idx.readOnlyDB.SetConnMaxLifetime(time.Hour) if err := goose.SetDialect("sqlite3"); err != nil { - return nil, err + return err } goose.SetLogger(log.GooseLogger()) goose.SetBaseFS(embedMigrations) if err := goose.Up(idx.readWriteDB, "migrations"); err != nil { - return nil, fmt.Errorf("goose up: %w", err) + return fmt.Errorf("goose up: %w", err) } idx.readOnlyQuery, err = indexerdb.Prepare(context.TODO(), idx.readOnlyDB) if err != nil { - return nil, err + return err } idx.blockQueries, err = indexerdb.Prepare(context.TODO(), idx.readWriteDB) if err != nil { - panic(err) + return err } + return nil +} - // Subscribe to events - idx.App.State.AddEventListener(idx) +func copyFile(dst, src string) error { + srcf, err := os.Open(src) + if err != nil { + return err + } + defer srcf.Close() - return idx, nil + // For now, we don't care about permissions + dstf, err := os.Create(dst) + if err != nil { + return err + } + _, err = io.Copy(dstf, srcf) + if err2 := dstf.Close(); err == nil { + err = err2 + } + return err } func (idx *Indexer) Close() error { @@ -184,6 +222,32 @@ func (idx *Indexer) Close() error { return nil } +// BackupPath restores the database from a backup created via SaveBackup. +// Note that this must be called with ExpectBackupRestore set to true, +// and before any indexing or queries happen. +func (idx *Indexer) RestoreBackup(ctx context.Context, path string) error { + if idx.readWriteDB != nil { + panic("Indexer.RestoreBackup called after the database was initialized") + } + if err := copyFile(idx.dbPath, path); err != nil { + return fmt.Errorf("could not restore indexer backup: %w", err) + } + if err := idx.startDB(); err != nil { + return err + } + return nil +} + +// SaveBackup backs up the database to a file on disk. +// Note that writes to the database may be blocked until the backup finishes, +// and an error may occur if a file at path already exists. +// +// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. +func (idx *Indexer) SaveBackup(ctx context.Context, path string) error { + _, err := idx.readOnlyDB.ExecContext(ctx, `VACUUM INTO ?`, path) + return err +} + // blockTxQueries assumes that lockPool is locked. func (idx *Indexer) blockTxQueries() *indexerdb.Queries { if idx.blockMu.TryLock() { diff --git a/vochain/indexer/indexer_test.go b/vochain/indexer/indexer_test.go index ba283dee8..1ac559cef 100644 --- a/vochain/indexer/indexer_test.go +++ b/vochain/indexer/indexer_test.go @@ -2,11 +2,13 @@ package indexer import ( "bytes" + "context" "encoding/hex" "fmt" "io" stdlog "log" "math/big" + "path/filepath" "testing" qt "github.com/frankban/quicktest" @@ -43,6 +45,79 @@ func newTestIndexer(tb testing.TB, app *vochain.BaseApplication) *Indexer { return idx } +func TestBackup(t *testing.T) { + app := vochain.TestBaseApplication(t) + + idx, err := New(app, Options{DataDir: t.TempDir()}) + qt.Assert(t, err, qt.IsNil) + + wantTotalVotes := func(want uint64) { + got, err := idx.CountTotalVotes() + qt.Assert(t, err, qt.IsNil) + qt.Assert(t, got, qt.Equals, want) + } + + vp, err := state.NewVotePackage([]int{1, 1, 1}).Encode() + qt.Assert(t, err, qt.IsNil) + + // A new indexer has no votes. + wantTotalVotes(0) + + // Add 10 votes and check they are counted. + pid := util.RandomBytes(32) + err = app.State.AddProcess(&models.Process{ + ProcessId: pid, + EnvelopeType: &models.EnvelopeType{EncryptedVotes: false}, + Status: models.ProcessStatus_READY, + Mode: &models.ProcessMode{AutoStart: true}, + BlockCount: 10, + MaxCensusSize: 1000, + VoteOptions: &models.ProcessVoteOptions{ + MaxCount: 5, + MaxValue: 1, + MaxTotalCost: 3, + CostExponent: 1, + }, + }) + qt.Assert(t, err, qt.IsNil) + for i := 0; i < 10; i++ { + v := &state.Vote{ProcessID: pid, VotePackage: vp, Nullifier: util.RandomBytes(32)} + qt.Assert(t, app.State.AddVote(v), qt.IsNil) + } + app.AdvanceTestBlock() + wantTotalVotes(10) + + // Back up the database. + backupPath := filepath.Join(t.TempDir(), "backup") + err = idx.SaveBackup(context.TODO(), backupPath) + qt.Assert(t, err, qt.IsNil) + + // Add another 5 votes which aren't in the backup. + for i := 0; i < 5; i++ { + v := &state.Vote{ProcessID: pid, VotePackage: vp, Nullifier: util.RandomBytes(32)} + qt.Assert(t, app.State.AddVote(v), qt.IsNil) + } + app.AdvanceTestBlock() + wantTotalVotes(15) + + // Starting a new database without the backup should see zero votes. + idx.Close() + idx, err = New(app, Options{DataDir: t.TempDir()}) + qt.Assert(t, err, qt.IsNil) + wantTotalVotes(0) + + // Starting a new database with the backup should see the votes from the backup. + idx.Close() + idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true}) + qt.Assert(t, err, qt.IsNil) + err = idx.RestoreBackup(context.TODO(), backupPath) + qt.Assert(t, err, qt.IsNil) + wantTotalVotes(10) + + // Close the last indexer. + idx.Close() +} + func TestEntityList(t *testing.T) { for _, count := range []int{2, 100, 155} { t.Run(fmt.Sprintf("count=%03d", count), func(t *testing.T) {