Skip to content

Commit

Permalink
Merge pull request #4013 from oasisprotocol/kostko/stable/21.2.x/back…
Browse files Browse the repository at this point in the history
…port-multi

[BACKPORT/21.2.x] Backport of multiple bugfixes
  • Loading branch information
kostko authored Jun 9, 2021
2 parents eb15875 + d3e98f6 commit 26f0197
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 79 deletions.
1 change: 1 addition & 0 deletions .changelog/4009.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/oasis-net-runner: Always honor fixture.default.setup_runtimes
1 change: 1 addition & 0 deletions .changelog/4010.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/common/badger: Fix v2->v3 migration for managed mode
5 changes: 5 additions & 0 deletions .changelog/4010.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/storage/mkvs/checkpoint: Checkpoint in descending order

Previously the checkpointer would generate checkpoints in ascending order
which meant that it could generate many checkpoints only to garbage collect
them in the next step.
17 changes: 12 additions & 5 deletions go/common/badger/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) {
}

// Perform the migration.
if err := migrateDatabase(opts); err != nil {
if err := migrateDatabase(opts, managed); err != nil {
return nil, fmt.Errorf("migration failed: %w", err)
}

// Retry opening the database.
return openFn(opts)
}

func migrateDatabase(opts badger.Options) error {
func migrateDatabase(opts badger.Options, managed bool) error {
var logger *logging.Logger
adapter, _ := opts.Logger.(*badgerLogger)
if logger != nil {
Expand All @@ -180,12 +180,19 @@ func migrateDatabase(opts badger.Options) error {
return fmt.Errorf("failed to remove temporary destination '%s': %w", temporaryDbName, err)
}

openFnV2 := badgerV2.Open
openFnV3 := badger.Open
if managed {
openFnV2 = badgerV2.OpenManaged
openFnV3 = badger.OpenManaged
}

// Open the database as Badger v2.
optsV2 := badgerV2.DefaultOptions(opts.Dir)
optsV2 = optsV2.WithNumVersionsToKeep(math.MaxInt32)
optsV2 = optsV2.WithLogger(nil)

dbV2, err := badgerV2.Open(optsV2)
dbV2, err := openFnV2(optsV2)
if err != nil {
return fmt.Errorf("failed to open source database: %w", err)
}
Expand All @@ -196,7 +203,7 @@ func migrateDatabase(opts badger.Options) error {
optsV3 = optsV3.WithNumVersionsToKeep(math.MaxInt32)
optsV3 = optsV3.WithLogger(NewLogAdapter(logger))

dbV3, err := badger.Open(optsV3)
dbV3, err := openFnV3(optsV3)
if err != nil {
return fmt.Errorf("failed to open destination database: %w", err)
}
Expand All @@ -213,7 +220,7 @@ func migrateDatabase(opts badger.Options) error {
defer w.Close()
defer bw.Flush()

_, errBackup := dbV2.Backup(bw, 0)
_, errBackup := backup(dbV2, bw, managed)
backupCh <- errBackup
}()

Expand Down
86 changes: 86 additions & 0 deletions go/common/badger/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package badger

import (
"bytes"
"context"
"encoding/binary"
"io"
"math"

badgerV2 "github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/golang/protobuf/proto" //nolint: staticcheck
)

// Adapted from Badger v2 which is Copyright 2017 Dgraph Labs, Inc. and Contributors, released
// under the Apache-2 license.

func backup(db *badgerV2.DB, w io.Writer, managed bool) (uint64, error) {
var stream *badgerV2.Stream
switch managed {
case true:
stream = db.NewStreamAt(math.MaxUint64)
case false:
stream = db.NewStream()
}

stream.LogPrefix = "migration"
stream.KeyToList = func(key []byte, itr *badgerV2.Iterator) (*pb.KVList, error) {
list := &pb.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
return list, nil
}

var valCopy []byte
var meta byte
switch item.IsDeletedOrExpired() {
case true:
// No need to copy value, if item is deleted or expired.
// Set delete bit.
meta = 1 << 0 // bitDelete
case false:
var err error
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil, err
}
}

kv := &pb.KV{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
Meta: []byte{meta},
}
list.Kv = append(list.Kv, kv)
}
return list, nil
}

var maxVersion uint64
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
}
if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
return err
}
buf, err := proto.Marshal(list)
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}

if err := stream.Orchestrate(context.Background()); err != nil {
return 0, err
}
return maxVersion, nil
}
7 changes: 4 additions & 3 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,10 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App
GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) {
params := s.ConsensusParameters()
return &checkpoint.CreationParameters{
Interval: params.StateCheckpointInterval,
NumKept: params.StateCheckpointNumKept,
ChunkSize: params.StateCheckpointChunkSize,
Interval: params.StateCheckpointInterval,
NumKept: params.StateCheckpointNumKept,
ChunkSize: params.StateCheckpointChunkSize,
InitialVersion: cfg.InitialHeight,
}, nil
},
}
Expand Down
112 changes: 56 additions & 56 deletions go/oasis-net-runner/fixtures/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,67 +139,67 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) {
{Entity: 1, Runtimes: []int{}},
{Entity: 1, Runtimes: []int{}},
}
}

var runtimeIDs []common.Namespace
for _, rtID := range viper.GetStringSlice(cfgRuntimeID) {
var rt common.Namespace
if err = rt.UnmarshalHex(rtID); err != nil {
cmdCommon.EarlyLogAndExit(fmt.Errorf("invalid runtime ID: %s: %w", rtID, err))
var runtimeIDs []common.Namespace
for _, rtID := range viper.GetStringSlice(cfgRuntimeID) {
var rt common.Namespace
if err = rt.UnmarshalHex(rtID); err != nil {
cmdCommon.EarlyLogAndExit(fmt.Errorf("invalid runtime ID: %s: %w", rtID, err))
}
runtimeIDs = append(runtimeIDs, rt)
}
runtimeIDs = append(runtimeIDs, rt)
}
rtGenesisStates := viper.GetStringSlice(cfgRuntimeGenesisState)

runtimes := viper.GetStringSlice(cfgRuntimeBinary)
if l1, l2 := len(runtimeIDs), len(runtimes); l1 < l2 {
cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime IDs, provided: %d, required: %d", l1, l2))
}
if l1, l2 := len(rtGenesisStates), len(runtimes); l1 < l2 {
cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime genesis states, provided: %d, required: %d", l1, l2))
}
rtGenesisStates := viper.GetStringSlice(cfgRuntimeGenesisState)

for i, rt := range runtimes {
// Compute runtime.
fixture.Runtimes = append(fixture.Runtimes, oasis.RuntimeFixture{
ID: runtimeIDs[i],
Kind: registry.KindCompute,
Entity: 0,
Keymanager: 0,
Binaries: map[node.TEEHardware][]string{
tee: {rt},
},
Executor: registry.ExecutorParameters{
GroupSize: 2,
GroupBackupSize: 1,
RoundTimeout: 20,
MaxMessages: 128,
},
TxnScheduler: registry.TxnSchedulerParameters{
Algorithm: registry.TxnSchedulerSimple,
MaxBatchSize: 1,
MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB
BatchFlushTimeout: 20 * time.Second,
ProposerTimeout: 20,
},
Storage: registry.StorageParameters{
GroupSize: 1,
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
GenesisStatePath: rtGenesisStates[i],
GenesisRound: 0,
GovernanceModel: registry.GovernanceEntity,
})
runtimes := viper.GetStringSlice(cfgRuntimeBinary)
if l1, l2 := len(runtimeIDs), len(runtimes); l1 < l2 {
cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime IDs, provided: %d, required: %d", l1, l2))
}
if l1, l2 := len(rtGenesisStates), len(runtimes); l1 < l2 {
cmdCommon.EarlyLogAndExit(fmt.Errorf("missing runtime genesis states, provided: %d, required: %d", l1, l2))
}

for j := range fixture.ComputeWorkers {
fixture.ComputeWorkers[j].Runtimes = append(fixture.ComputeWorkers[j].Runtimes, i+1)
for i, rt := range runtimes {
// Compute runtime.
fixture.Runtimes = append(fixture.Runtimes, oasis.RuntimeFixture{
ID: runtimeIDs[i],
Kind: registry.KindCompute,
Entity: 0,
Keymanager: 0,
Binaries: map[node.TEEHardware][]string{
tee: {rt},
},
Executor: registry.ExecutorParameters{
GroupSize: 2,
GroupBackupSize: 1,
RoundTimeout: 20,
MaxMessages: 128,
},
TxnScheduler: registry.TxnSchedulerParameters{
Algorithm: registry.TxnSchedulerSimple,
MaxBatchSize: 1,
MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB
BatchFlushTimeout: 20 * time.Second,
ProposerTimeout: 20,
},
Storage: registry.StorageParameters{
GroupSize: 1,
MinWriteReplication: 1,
MaxApplyWriteLogEntries: 100_000,
MaxApplyOps: 2,
},
AdmissionPolicy: registry.RuntimeAdmissionPolicy{
AnyNode: &registry.AnyNodeRuntimeAdmissionPolicy{},
},
GenesisStatePath: rtGenesisStates[i],
GenesisRound: 0,
GovernanceModel: registry.GovernanceEntity,
})

for j := range fixture.ComputeWorkers {
fixture.ComputeWorkers[j].Runtimes = append(fixture.ComputeWorkers[j].Runtimes, i+1)
}
fixture.Clients[0].Runtimes = append(fixture.Clients[0].Runtimes, i+1)
}
fixture.Clients[0].Runtimes = append(fixture.Clients[0].Runtimes, i+1)
}

return fixture, nil
Expand Down
1 change: 1 addition & 0 deletions go/oasis-node/cmd/debug/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,5 @@ func Register(parentCmd *cobra.Command) {
beaconCmd.PersistentFlags().AddFlagSet(cmdGrpc.ClientFlags)

beaconCmd.AddCommand(beaconStatusCmd)
parentCmd.AddCommand(beaconCmd)
}
30 changes: 23 additions & 7 deletions go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,18 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para
if err != nil {
return fmt.Errorf("checkpointer: failed to get earliest version: %w", err)
}
if lastCheckpointVersion < earlyVersion {
lastCheckpointVersion = earlyVersion - params.Interval
firstCheckpointVersion := lastCheckpointVersion + 1 // We can checkpoint the next version.
if firstCheckpointVersion < earlyVersion {
firstCheckpointVersion = earlyVersion
}
if lastCheckpointVersion < params.InitialVersion {
lastCheckpointVersion = params.InitialVersion - params.Interval
if firstCheckpointVersion < params.InitialVersion {
firstCheckpointVersion = params.InitialVersion
}

// Checkpoint any missing versions.
for cpVersion := lastCheckpointVersion + params.Interval; cpVersion < version; cpVersion = cpVersion + params.Interval {
// Checkpoint any missing versions in descending order, stopping at NumKept checkpoints.
newCheckpointVersion := ((version-params.InitialVersion)/params.Interval)*params.Interval + params.InitialVersion
var numAddedCheckpoints uint64
for cpVersion := newCheckpointVersion; cpVersion >= firstCheckpointVersion; {
c.logger.Info("checkpointing version",
"version", cpVersion,
)
Expand All @@ -191,7 +194,20 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para
"version", cpVersion,
"err", err,
)
return fmt.Errorf("checkpointer: failed to checkpoint version: %w", err)
break
}

// Move to the next version, avoiding possible underflow.
if cpVersion < params.Interval {
break
}
cpVersion = cpVersion - params.Interval

// Stop when we have enough checkpoints as otherwise we will be creating checkpoints which
// will be garbage collected anyway.
numAddedCheckpoints++
if numAddedCheckpoints >= params.NumKept {
break
}
}

Expand Down
Loading

0 comments on commit 26f0197

Please sign in to comment.