diff --git a/.changelog/4009.bugfix.md b/.changelog/4009.bugfix.md new file mode 100644 index 00000000000..49fb8ab2b75 --- /dev/null +++ b/.changelog/4009.bugfix.md @@ -0,0 +1 @@ +go/oasis-net-runner: Always honor fixture.default.setup_runtimes diff --git a/.changelog/4010.bugfix.2.md b/.changelog/4010.bugfix.2.md new file mode 100644 index 00000000000..d6744549111 --- /dev/null +++ b/.changelog/4010.bugfix.2.md @@ -0,0 +1 @@ +go/common/badger: Fix v2->v3 migration for managed mode diff --git a/.changelog/4010.bugfix.md b/.changelog/4010.bugfix.md new file mode 100644 index 00000000000..789b529dd0b --- /dev/null +++ b/.changelog/4010.bugfix.md @@ -0,0 +1,5 @@ +go/storage/mkvs/checkpoint: Checkpoint in descending order + +Previously the checkpointer would generate checkpoints in ascending order +which meant that it could generate many checkpoints only to garbage collect +them in the next step. diff --git a/go/common/badger/helpers.go b/go/common/badger/helpers.go index 4d1d8e759c8..2d5c0381991 100644 --- a/go/common/badger/helpers.go +++ b/go/common/badger/helpers.go @@ -149,7 +149,7 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) { } // Perform the migration. - if err := migrateDatabase(opts); err != nil { + if err := migrateDatabase(opts, managed); err != nil { return nil, fmt.Errorf("migration failed: %w", err) } @@ -157,7 +157,7 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) { return openFn(opts) } -func migrateDatabase(opts badger.Options) error { +func migrateDatabase(opts badger.Options, managed bool) error { var logger *logging.Logger adapter, _ := opts.Logger.(*badgerLogger) if logger != nil { @@ -180,12 +180,19 @@ func migrateDatabase(opts badger.Options) error { return fmt.Errorf("failed to remove temporary destination '%s': %w", temporaryDbName, err) } + openFnV2 := badgerV2.Open + openFnV3 := badger.Open + if managed { + openFnV2 = badgerV2.OpenManaged + openFnV3 = badger.OpenManaged + } + // Open the database as Badger v2. optsV2 := badgerV2.DefaultOptions(opts.Dir) optsV2 = optsV2.WithNumVersionsToKeep(math.MaxInt32) optsV2 = optsV2.WithLogger(nil) - dbV2, err := badgerV2.Open(optsV2) + dbV2, err := openFnV2(optsV2) if err != nil { return fmt.Errorf("failed to open source database: %w", err) } @@ -196,7 +203,7 @@ func migrateDatabase(opts badger.Options) error { optsV3 = optsV3.WithNumVersionsToKeep(math.MaxInt32) optsV3 = optsV3.WithLogger(NewLogAdapter(logger)) - dbV3, err := badger.Open(optsV3) + dbV3, err := openFnV3(optsV3) if err != nil { return fmt.Errorf("failed to open destination database: %w", err) } @@ -213,7 +220,7 @@ func migrateDatabase(opts badger.Options) error { defer w.Close() defer bw.Flush() - _, errBackup := dbV2.Backup(bw, 0) + _, errBackup := backup(dbV2, bw, managed) backupCh <- errBackup }() diff --git a/go/common/badger/migrate.go b/go/common/badger/migrate.go new file mode 100644 index 00000000000..3c0d4a51571 --- /dev/null +++ b/go/common/badger/migrate.go @@ -0,0 +1,86 @@ +package badger + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "math" + + badgerV2 "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/pb" + "github.com/golang/protobuf/proto" //nolint: staticcheck +) + +// Adapted from Badger v2 which is Copyright 2017 Dgraph Labs, Inc. and Contributors, released +// under the Apache-2 license. + +func backup(db *badgerV2.DB, w io.Writer, managed bool) (uint64, error) { + var stream *badgerV2.Stream + switch managed { + case true: + stream = db.NewStreamAt(math.MaxUint64) + case false: + stream = db.NewStream() + } + + stream.LogPrefix = "migration" + stream.KeyToList = func(key []byte, itr *badgerV2.Iterator) (*pb.KVList, error) { + list := &pb.KVList{} + for ; itr.Valid(); itr.Next() { + item := itr.Item() + if !bytes.Equal(item.Key(), key) { + return list, nil + } + + var valCopy []byte + var meta byte + switch item.IsDeletedOrExpired() { + case true: + // No need to copy value, if item is deleted or expired. + // Set delete bit. + meta = 1 << 0 // bitDelete + case false: + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + return nil, err + } + } + + kv := &pb.KV{ + Key: item.KeyCopy(nil), + Value: valCopy, + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), + Meta: []byte{meta}, + } + list.Kv = append(list.Kv, kv) + } + return list, nil + } + + var maxVersion uint64 + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + if maxVersion < kv.Version { + maxVersion = kv.Version + } + } + if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil { + return err + } + buf, err := proto.Marshal(list) + if err != nil { + return err + } + _, err = w.Write(buf) + return err + } + + if err := stream.Orchestrate(context.Background()); err != nil { + return 0, err + } + return maxVersion, nil +} diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index 2a3f0bb0e92..0374f5bb4b2 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -572,9 +572,10 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) { params := s.ConsensusParameters() return &checkpoint.CreationParameters{ - Interval: params.StateCheckpointInterval, - NumKept: params.StateCheckpointNumKept, - ChunkSize: params.StateCheckpointChunkSize, + Interval: params.StateCheckpointInterval, + NumKept: params.StateCheckpointNumKept, + ChunkSize: params.StateCheckpointChunkSize, + InitialVersion: cfg.InitialHeight, }, nil }, } diff --git a/go/oasis-net-runner/fixtures/default.go b/go/oasis-net-runner/fixtures/default.go index 188f42dccbd..f4c33f67d23 100644 --- a/go/oasis-net-runner/fixtures/default.go +++ b/go/oasis-net-runner/fixtures/default.go @@ -139,67 +139,67 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) { {Entity: 1, Runtimes: []int{}}, {Entity: 1, Runtimes: []int{}}, } - } - var runtimeIDs []common.Namespace - for _, rtID := range viper.GetStringSlice(cfgRuntimeID) { - var rt common.Namespace - if err = rt.UnmarshalHex(rtID); err != nil { - cmdCommon.EarlyLogAndExit(fmt.Errorf("invalid runtime ID: %s: %w", rtID, err)) + var runtimeIDs []common.Namespace + for _, rtID := range viper.GetStringSlice(cfgRuntimeID) { + var rt common.Namespace + if err = rt.UnmarshalHex(rtID); err != nil { + cmdCommon.EarlyLogAndExit(fmt.Errorf("invalid runtime ID: %s: %w", rtID, err)) + } + runtimeIDs = append(runtimeIDs, rt) } - runtimeIDs = append(runtimeIDs, rt) - } - rtGenesisStates := viper.GetStringSlice(cfgRuntimeGenesisState) - - runtimes := viper.GetStringSlice(cfgRuntimeBinary) - if l1, l2 := len(runtimeIDs), len(runtimes); l1 < l2 { - cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime IDs, provided: %d, required: %d", l1, l2)) - } - if l1, l2 := len(rtGenesisStates), len(runtimes); l1 < l2 { - cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime genesis states, provided: %d, required: %d", l1, l2)) - } + rtGenesisStates := viper.GetStringSlice(cfgRuntimeGenesisState) - for i, rt := range runtimes { - // Compute runtime. - fixture.Runtimes = append(fixture.Runtimes, oasis.RuntimeFixture{ - ID: runtimeIDs[i], - Kind: registry.KindCompute, - Entity: 0, - Keymanager: 0, - Binaries: map[node.TEEHardware][]string{ - tee: {rt}, - }, - Executor: registry.ExecutorParameters{ - GroupSize: 2, - GroupBackupSize: 1, - RoundTimeout: 20, - MaxMessages: 128, - }, - TxnScheduler: registry.TxnSchedulerParameters{ - Algorithm: registry.TxnSchedulerSimple, - MaxBatchSize: 1, - MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB - BatchFlushTimeout: 20 * time.Second, - ProposerTimeout: 20, - }, - Storage: registry.StorageParameters{ - GroupSize: 1, - MinWriteReplication: 1, - MaxApplyWriteLogEntries: 100_000, - MaxApplyOps: 2, - }, - AdmissionPolicy: registry.RuntimeAdmissionPolicy{ - AnyNode: ®istry.AnyNodeRuntimeAdmissionPolicy{}, - }, - GenesisStatePath: rtGenesisStates[i], - GenesisRound: 0, - GovernanceModel: registry.GovernanceEntity, - }) + runtimes := viper.GetStringSlice(cfgRuntimeBinary) + if l1, l2 := len(runtimeIDs), len(runtimes); l1 < l2 { + cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime IDs, provided: %d, required: %d", l1, l2)) + } + if l1, l2 := len(rtGenesisStates), len(runtimes); l1 < l2 { + cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime genesis states, provided: %d, required: %d", l1, l2)) + } - for j := range fixture.ComputeWorkers { - fixture.ComputeWorkers[j].Runtimes = append(fixture.ComputeWorkers[j].Runtimes, i+1) + for i, rt := range runtimes { + // Compute runtime. + fixture.Runtimes = append(fixture.Runtimes, oasis.RuntimeFixture{ + ID: runtimeIDs[i], + Kind: registry.KindCompute, + Entity: 0, + Keymanager: 0, + Binaries: map[node.TEEHardware][]string{ + tee: {rt}, + }, + Executor: registry.ExecutorParameters{ + GroupSize: 2, + GroupBackupSize: 1, + RoundTimeout: 20, + MaxMessages: 128, + }, + TxnScheduler: registry.TxnSchedulerParameters{ + Algorithm: registry.TxnSchedulerSimple, + MaxBatchSize: 1, + MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB + BatchFlushTimeout: 20 * time.Second, + ProposerTimeout: 20, + }, + Storage: registry.StorageParameters{ + GroupSize: 1, + MinWriteReplication: 1, + MaxApplyWriteLogEntries: 100_000, + MaxApplyOps: 2, + }, + AdmissionPolicy: registry.RuntimeAdmissionPolicy{ + AnyNode: ®istry.AnyNodeRuntimeAdmissionPolicy{}, + }, + GenesisStatePath: rtGenesisStates[i], + GenesisRound: 0, + GovernanceModel: registry.GovernanceEntity, + }) + + for j := range fixture.ComputeWorkers { + fixture.ComputeWorkers[j].Runtimes = append(fixture.ComputeWorkers[j].Runtimes, i+1) + } + fixture.Clients[0].Runtimes = append(fixture.Clients[0].Runtimes, i+1) } - fixture.Clients[0].Runtimes = append(fixture.Clients[0].Runtimes, i+1) } return fixture, nil diff --git a/go/oasis-node/cmd/debug/beacon/beacon.go b/go/oasis-node/cmd/debug/beacon/beacon.go index 5b6b6cc02ed..eaecd0a0783 100644 --- a/go/oasis-node/cmd/debug/beacon/beacon.go +++ b/go/oasis-node/cmd/debug/beacon/beacon.go @@ -99,4 +99,5 @@ func Register(parentCmd *cobra.Command) { beaconCmd.PersistentFlags().AddFlagSet(cmdGrpc.ClientFlags) beaconCmd.AddCommand(beaconStatusCmd) + parentCmd.AddCommand(beaconCmd) } diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index b4c9e5611ca..1eab5f94bec 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -173,15 +173,18 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para if err != nil { return fmt.Errorf("checkpointer: failed to get earliest version: %w", err) } - if lastCheckpointVersion < earlyVersion { - lastCheckpointVersion = earlyVersion - params.Interval + firstCheckpointVersion := lastCheckpointVersion + 1 // We can checkpoint the next version. + if firstCheckpointVersion < earlyVersion { + firstCheckpointVersion = earlyVersion } - if lastCheckpointVersion < params.InitialVersion { - lastCheckpointVersion = params.InitialVersion - params.Interval + if firstCheckpointVersion < params.InitialVersion { + firstCheckpointVersion = params.InitialVersion } - // Checkpoint any missing versions. - for cpVersion := lastCheckpointVersion + params.Interval; cpVersion < version; cpVersion = cpVersion + params.Interval { + // Checkpoint any missing versions in descending order, stopping at NumKept checkpoints. + newCheckpointVersion := ((version-params.InitialVersion)/params.Interval)*params.Interval + params.InitialVersion + var numAddedCheckpoints uint64 + for cpVersion := newCheckpointVersion; cpVersion >= firstCheckpointVersion; { c.logger.Info("checkpointing version", "version", cpVersion, ) @@ -191,7 +194,20 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para "version", cpVersion, "err", err, ) - return fmt.Errorf("checkpointer: failed to checkpoint version: %w", err) + break + } + + // Move to the next version, avoiding possible underflow. + if cpVersion < params.Interval { + break + } + cpVersion = cpVersion - params.Interval + + // Stop when we have enough checkpoints as otherwise we will be creating checkpoints which + // will be garbage collected anyway. + numAddedCheckpoints++ + if numAddedCheckpoints >= params.NumKept { + break } } diff --git a/go/storage/mkvs/checkpoint/checkpointer_test.go b/go/storage/mkvs/checkpoint/checkpointer_test.go index 2ff6acbe2cf..7a7d54df71a 100644 --- a/go/storage/mkvs/checkpoint/checkpointer_test.go +++ b/go/storage/mkvs/checkpoint/checkpointer_test.go @@ -23,7 +23,7 @@ const ( testNumKept = 2 ) -func testCheckpointer(t *testing.T, earliestVersion uint64, preExistingData bool) { +func testCheckpointer(t *testing.T, earliestVersion, interval uint64, preExistingData bool) { require := require.New(t) ctx := context.Background() @@ -76,7 +76,7 @@ func testCheckpointer(t *testing.T, earliestVersion uint64, preExistingData bool CheckInterval: testCheckInterval, RootsPerVersion: 1, Parameters: &CreationParameters{ - Interval: 1, + Interval: interval, NumKept: testNumKept, ChunkSize: 16 * 1024, InitialVersion: earliestVersion, @@ -92,7 +92,7 @@ func testCheckpointer(t *testing.T, earliestVersion uint64, preExistingData bool require.NoError(err, "NewCheckpointer") // Finalize a few rounds. - for round := earliestVersion; round < earliestVersion+10; round++ { + for round := earliestVersion; round < earliestVersion+(testNumKept+1)*interval; round++ { tree := mkvs.NewWithRoot(nil, ndb, root) err = tree.Insert(ctx, []byte(fmt.Sprintf("round %d", round)), []byte(fmt.Sprintf("value %d", round))) require.NoError(err, "Insert") @@ -114,25 +114,28 @@ func testCheckpointer(t *testing.T, earliestVersion uint64, preExistingData bool } // Make sure that there are always the correct number of checkpoints. - if round > earliestVersion+testNumKept+1 { + if round > earliestVersion+(testNumKept+1)*interval { cps, err := fc.GetCheckpoints(ctx, &GetCheckpointsRequest{ Version: checkpointVersion, Namespace: testNs, }) require.NoError(err, "GetCheckpoints") - require.Len(cps, testNumKept+1, "incorrect number of live checkpoints") + require.Len(cps, testNumKept, "incorrect number of live checkpoints") } } } func TestCheckpointer(t *testing.T) { t.Run("Basic", func(t *testing.T) { - testCheckpointer(t, 0, false) + testCheckpointer(t, 0, 1, false) }) t.Run("NonZeroEarliestVersion", func(t *testing.T) { - testCheckpointer(t, 1000, false) + testCheckpointer(t, 1000, 1, false) }) t.Run("NonZeroEarliestInitialVersion", func(t *testing.T) { - testCheckpointer(t, 100, true) + testCheckpointer(t, 100, 1, true) + }) + t.Run("MaybeUnderflow", func(t *testing.T) { + testCheckpointer(t, 5, 10, true) }) }