Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 16, 2025
1 parent ab819b5 commit e3fb899
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 6 deletions.
90 changes: 90 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 93 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L93

Added line #L93 was not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 121 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L121

Added line #L121 was not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err

Check warning on line 127 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L127

Added line #L127 was not covered by tests
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
_, st, _, err := w.ReadAll()
if err != nil {
return err

Check warning on line 133 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L133

Added line #L133 was not covered by tests
}

if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
return err

Check warning on line 137 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L137

Added line #L137 was not covered by tests
}
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
return err

Check warning on line 140 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L140

Added line #L140 was not covered by tests
}
return w.Sync()
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
191 changes: 191 additions & 0 deletions etcdutl/etcdutl/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@
package etcdutl

import (
"path"
"path/filepath"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -141,3 +151,184 @@ func TestGetLatestWalSnap(t *testing.T) {
})
}
}

func TestCreateV2SnapshotFromV3Store(t *testing.T) {
testCases := []struct {
name string
consistentIndex uint64
term uint64
clusterVersion string
members []uint64
learners []uint64
removedMembers []uint64
expectedErrMsg string
}{
{
name: "unexpected term: less than the last snapshot.term",
consistentIndex: 3,
term: 1,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "unexpected consistent index: less than the last snapshot.index",
consistentIndex: 1,
term: 3,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "normal case",
consistentIndex: 32,
term: 4,
clusterVersion: "3.5.0",
members: []uint64{100, 200},
learners: []uint64{300},
removedMembers: []uint64{400, 500},
},
{
name: "empty cluster version",
consistentIndex: 45,
term: 4,
clusterVersion: "",
members: []uint64{110, 200},
learners: []uint64{350},
removedMembers: []uint64{450, 500},
},
{
name: "no learner",
consistentIndex: 7,
term: 5,
clusterVersion: "3.5.0",
members: []uint64{150, 200},
removedMembers: []uint64{450, 550},
},
{
name: "no removed members",
consistentIndex: 34,
term: 6,
clusterVersion: "3.7.0",
members: []uint64{160, 200},
learners: []uint64{300},
},
{
name: "no learner and removed members",
consistentIndex: 19,
term: 5,
clusterVersion: "3.6.0",
members: []uint64{120, 220},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dataDir := t.TempDir()
lg := zap.NewNop()

require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir)))

t.Log("Populate the wal file")
w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: 1,
ClusterID: 2,
},
))
require.NoError(t, err)
err = w.SaveSnapshot(walpb.Snapshot{Index: 2, Term: 2, ConfState: &raftpb.ConfState{Voters: []uint64{1}}})
require.NoError(t, err)
err = w.Save(raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, nil)
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)

t.Log("Generate a v2 snapshot file")
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
err = ss.SaveSnap(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}})
require.NoError(t, err)

t.Log("Load and verify the latest v2 snapshot file")
oldV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}, oldV2Snap.Metadata)

t.Log("Prepare the bbolt db")
be := backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db"))
schema.CreateMetaBucket(be.BatchTx())
schema.NewMembershipBackend(lg, be).MustCreateBackendBuckets()

if len(tc.clusterVersion) > 0 {
t.Logf("Populate the cluster version: %s", tc.clusterVersion)
schema.NewMembershipBackend(lg, be).MustSaveClusterVersionToBackend(semver.New(tc.clusterVersion))
} else {
t.Log("Skip populating cluster version due to not provided")
}

tx := be.BatchTx()
tx.LockOutsideApply()
t.Log("Populate the consistent index and term")
ci := cindex.NewConsistentIndex(be)
ci.SetConsistentIndex(tc.consistentIndex, tc.term)
ci.UnsafeSave(tx)
tx.Unlock()

t.Logf("Populate members: %d", len(tc.members))
memberBackend := schema.NewMembershipBackend(lg, be)
for _, mID := range tc.members {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID)})
}

t.Logf("Populate learner: %d", len(tc.learners))
for _, mID := range tc.learners {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID), RaftAttributes: membership.RaftAttributes{IsLearner: true}})
}

t.Logf("Populate removed members: %d", len(tc.removedMembers))
for _, mID := range tc.removedMembers {
memberBackend.MustDeleteMemberFromBackend(types.ID(mID))
}

t.Log("Committing bbolt db")
be.ForceCommit()
require.NoError(t, be.Close())

t.Log("Creating a new v2 snapshot file based on the v3 store")
err = createV2SnapshotFromV3Store(dataDir, backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db")))
if len(tc.expectedErrMsg) > 0 {
require.ErrorContains(t, err, tc.expectedErrMsg)
return
}
require.NoError(t, err)

t.Log("Loading & verifying the new latest v2 snapshot file")
newV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: tc.consistentIndex, Term: tc.term, ConfState: raftpb.ConfState{Voters: tc.members, Learners: tc.learners}}, newV2Snap.Metadata)

st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
require.NoError(t, st.Recovery(newV2Snap.Data))

cv, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "version"), false, false)
if len(tc.clusterVersion) > 0 {
require.NoError(t, err)
if !semver.New(*cv.Node.Value).Equal(*semver.New(tc.clusterVersion)) {
t.Fatalf("Unexpected cluster version, got %s, want %s", semver.New(*cv.Node.Value).String(), tc.clusterVersion)
}
} else {
require.ErrorContains(t, err, "Key not found")
}

members, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "members"), true, true)
require.NoError(t, err)
require.Len(t, members.Node.Nodes, len(tc.members)+len(tc.learners))

removedMembers, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "removed_members"), true, true)
if len(tc.removedMembers) > 0 {
require.NoError(t, err)
require.Equal(t, len(tc.removedMembers), len(removedMembers.Node.Nodes))
} else {
require.ErrorContains(t, err, "Key not found")
}
})
}
}
17 changes: 16 additions & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,28 @@ func migrateCommandFunc(c *migrateConfig) error {
tx := be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))

Check warning on line 132 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L132

Added line #L132 was not covered by tests
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// only generate a v2 snapshot file for downgrade case
if c.targetVersion.LessThan(current) {

Check warning on line 141 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L141

Added line #L141 was not covered by tests
// Update cluster version
schema.NewMembershipBackend(c.lg, be).MustSaveClusterVersionToBackend(c.targetVersion)

Check warning on line 143 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L143

Added line #L143 was not covered by tests

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err

Check warning on line 149 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L147-L149

Added lines #L147 - L149 were not covered by tests
}
c.lg.Info("Generated a v2 snapshot file")

Check warning on line 151 in etcdutl/etcdutl/migrate_command.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/migrate_command.go#L151

Added line #L151 was not covered by tests
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err
Expand All @@ -150,6 +164,7 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

be.ForceCommit()
return nil
}
Expand Down
Loading

0 comments on commit e3fb899

Please sign in to comment.