Skip to content

Commit

Permalink
tests/e2e: add new test cases related to HashKV
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Fu <[email protected]>
  • Loading branch information
fuweid committed Jul 26, 2024
1 parent 94077fc commit cfea1fe
Showing 1 changed file with 321 additions and 0 deletions.
321 changes: 321 additions & 0 deletions tests/e2e/hashkv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/config"
fe2e "go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestVerifyHashKVAfterCompact(t *testing.T) {
tests := []struct {
compactedOnRev int64
hashKVOnRev int64
}{
{
compactedOnRev: 33, // tombstone
hashKVOnRev: 33, // tombstone
},
{
compactedOnRev: 22, // tombstone
hashKVOnRev: 33, // tombstone
},
{
compactedOnRev: 33, // tombstone
hashKVOnRev: 41, // non-tombstone
},
{
compactedOnRev: 26, // non-tombstone
hashKVOnRev: 33, // tombstone
},
{
compactedOnRev: 32, // non-tombstone
hashKVOnRev: 41, // non-tombstone
},
}

scenarios := []struct {
ClusterVersion fe2e.ClusterVersion
OnlyOneKey bool
}{
{
ClusterVersion: fe2e.CurrentVersion,
OnlyOneKey: true,
},
{
ClusterVersion: fe2e.CurrentVersion,
OnlyOneKey: false,
},
{
ClusterVersion: fe2e.QuorumLastVersion,
OnlyOneKey: true,
},
{
ClusterVersion: fe2e.QuorumLastVersion,
OnlyOneKey: false,
},
}

for idx, tt := range tests {
for _, scenario := range scenarios {
t.Run(fmt.Sprintf("#%d - %s - OnlyOneKey=%v", idx, scenario.ClusterVersion, scenario.OnlyOneKey), func(t *testing.T) {
ctx := context.Background()

clus, cliCfg := newClusterForHashKV(t, scenario.ClusterVersion)

newTestKeySetInCluster(t, clus, cliCfg, scenario.OnlyOneKey)

cli, err := fe2e.NewEtcdctl(cliCfg, clus.EndpointsGRPC())
require.NoError(t, err)

t.Logf("COMPACT rev=%d", tt.compactedOnRev)
_, err = cli.Compact(ctx, tt.compactedOnRev, config.CompactOption{Physical: true})
require.NoError(t, err)

t.Logf("HashKV on rev=%d", tt.hashKVOnRev)
resp, err := cli.HashKV(ctx, tt.hashKVOnRev)
require.NoError(t, err)

require.Len(t, resp, 3)
require.True(t, resp[0].Hash != 0)
t.Logf("One Hash value is %d", resp[0].Hash)

require.Equal(t, resp[0].Hash, resp[1].Hash)
require.Equal(t, resp[1].Hash, resp[2].Hash)
})
}
}
}

// key: "foo"
// modified: 41
// generations:
//
// {34, 0}, {35, 0}, {36, 0}, {37, 0}, {38, 0}, {39, 0}, {40, 0}, {41, 0}
// {23, 0}, {24, 0}, {25, 0}, {26, 0}, {27, 0}, {28, 0}, {29, 0}, {30, 0}, {31, 0}, {32, 0}, {33, 0}[tombstone]
// {12, 0}, {13, 0}, {14, 0}, {15, 0}, {16, 0}, {17, 0}, {18, 0}, {19, 0}, {20, 0}, {21, 0}, {22, 0}[tombstone]
// {2, 0}, {3, 0}, {4, 0}, {5, 0}, {6, 0}, {7, 0}, {8, 0}, {9, 0}, {10, 0}, {11, 0}[tombstone]
//
// # If onlyOneKey is false
//
// key: "foo-1"
// modified: 12
// generations:
//
// {12, 1}
//
// key: "foo-2"
// modified: 23
// generations:
//
// {23, 1}
//
// key: "foo-3"
// modified: 34
// generations:
//
// {34, 1}
func newTestKeySetInCluster(t *testing.T, clus *fe2e.EtcdProcessCluster, cliCfg fe2e.ClientConfig, onlyOneKey bool) {
t.Helper()
c := newClient(t, clus.EndpointsGRPC(), cliCfg)
defer c.Close()

ctx := context.Background()
key := "foo"
totalRev := 41

deleteOnRev := 11 // 22, 33
lastRevision := int64(1)
for i := 2; i <= totalRev; i++ {
require.Equal(t, int64(i-1), lastRevision)

if i%deleteOnRev == 0 {
t.Logf("DELETEing key=%s", key)
resp, derr := c.Delete(ctx, key)
require.NoError(t, derr)
lastRevision = resp.Header.Revision
continue
}

value := fmt.Sprintf("%d", i)
ops := []clientv3.Op{clientv3.OpPut(key, value)}

logMsg := fmt.Sprintf("PUTing key=%s", key)
if i%deleteOnRev == 1 && !onlyOneKey {
key2 := fmt.Sprintf("%s-%d", key, i/deleteOnRev)
ops = append(ops, clientv3.OpPut(key2, value))
logMsg = fmt.Sprintf("%s,key=%s", logMsg, key2)
}
t.Logf("%s val=%s", logMsg, value)

resp, terr := c.Txn(ctx).Then(ops...).Commit()
require.NoError(t, terr)
require.True(t, resp.Succeeded)
require.Len(t, resp.Responses, len(ops))
lastRevision = resp.Header.Revision
}
}

// TestVerifyHashKVAfterTwoCompactions_MixVersions
//
// NOTE: Current version always keep compacted revision, while existing releases
// delete that compacted revisions if they're tombstone. Current version should
// skip previous compacted revision if it's tombstone. So, that HashKV result
// can be the same in mix versions, especially it won't file data corrupt alert
// when cluster is updating to new release.
func TestVerifyHashKVAfterTwoCompactions_MixVersions(t *testing.T) {
clus, cliCfg := newClusterForHashKV(t, fe2e.QuorumLastVersion)

newTestKeySet2InCluster(t, clus, cliCfg)

cli, err := fe2e.NewEtcdctl(cliCfg, clus.EndpointsGRPC())
require.NoError(t, err)

ctx := context.Background()

firstCompactOnRev := int64(6)
t.Logf("COMPACT rev=%d", firstCompactOnRev)
_, err = cli.Compact(ctx, firstCompactOnRev, config.CompactOption{Physical: true})
require.NoError(t, err)

secondCompactOnRev := int64(10)
t.Logf("COMPACT rev=%d", secondCompactOnRev)
_, err = cli.Compact(ctx, secondCompactOnRev, config.CompactOption{Physical: true})
require.NoError(t, err)

for hashKVOnRev := int64(10); hashKVOnRev <= 14; hashKVOnRev++ {
t.Logf("HashKV on rev=%d", hashKVOnRev)
resp, err := cli.HashKV(ctx, hashKVOnRev)
require.NoError(t, err)

require.Len(t, resp, 3)
require.True(t, resp[0].Hash != 0)
t.Logf("One Hash value is %d", resp[0].Hash)

require.Equal(t, resp[0].Hash, resp[1].Hash)
require.Equal(t, resp[1].Hash, resp[2].Hash)
}
}

// key: "foo"
// modified: 14
// generations:
//
// {13, 0}, {14, 0}
// {2, 0}, {4, 0}, {6, 0}[tombstone]
//
// key: "foo1"
// modified: 6
// generations:
//
// {empty}
// {3, 0}, {5, 0}, {6, 1}[tombstone]
//
// key: "foo2"
// modified: 11
// generations:
//
// {empty}
// {7, 0}, {10, 0}, {11, 0}[tombstone]
//
// key: "foo3"
// modified: 11
// generations:
//
// {empty}
// {8, 0}, {9, 0}, {12, 0}[tombstone]
func newTestKeySet2InCluster(t *testing.T, clus *fe2e.EtcdProcessCluster, cliCfg fe2e.ClientConfig) {
t.Helper()
c := newClient(t, clus.EndpointsGRPC(), cliCfg)
defer c.Close()

ctx := context.Background()

_, err := c.Put(ctx, "foo", "2")
require.NoError(t, err)

_, err = c.Put(ctx, "foo1", "3")
require.NoError(t, err)

_, err = c.Put(ctx, "foo", "4")
require.NoError(t, err)

_, err = c.Put(ctx, "foo1", "5")
require.NoError(t, err)

_, err = c.Txn(ctx).Then(
clientv3.OpDelete("foo"),
clientv3.OpDelete("foo1"),
).Commit()
require.NoError(t, err)

_, err = c.Put(ctx, "foo2", "7")
require.NoError(t, err)

_, err = c.Put(ctx, "foo3", "8")
require.NoError(t, err)

_, err = c.Put(ctx, "foo3", "9")
require.NoError(t, err)

_, err = c.Put(ctx, "foo2", "10")
require.NoError(t, err)

_, err = c.Delete(ctx, "foo2")
require.NoError(t, err)

_, err = c.Delete(ctx, "foo3")
require.NoError(t, err)

_, err = c.Put(ctx, "foo", "13")
require.NoError(t, err)

resp, err := c.Put(ctx, "foo", "14")
require.NoError(t, err)
require.Equal(t, int64(14), resp.Header.Revision)
}

func newClusterForHashKV(t *testing.T, clusVersion fe2e.ClusterVersion) (*fe2e.EtcdProcessCluster, fe2e.ClientConfig) {
if clusVersion != fe2e.CurrentVersion {
if !fileutil.Exist(fe2e.BinPath.EtcdLastRelease) {
t.Skipf("%q does not exist", fe2e.BinPath.EtcdLastRelease)
}
}

ctx := context.Background()

fe2e.BeforeTest(t)
cfg := fe2e.DefaultConfig()
cfg.Client = fe2e.ClientConfig{ConnectionType: fe2e.ClientTLS}

clus, err := fe2e.NewEtcdProcessCluster(ctx, t,
fe2e.WithConfig(cfg),
fe2e.WithClusterSize(3),
fe2e.WithVersion(clusVersion))
require.NoError(t, err)

t.Cleanup(func() { clus.Close() })
return clus, cfg.Client
}

0 comments on commit cfea1fe

Please sign in to comment.