From b49d11455cdab0607cbb985c0de11f3f83479ff7 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 16:06:29 +0800 Subject: [PATCH 01/11] update leader path Signed-off-by: okJiang <819421878@qq.com> --- pkg/member/member.go | 3 ++- pkg/member/participant.go | 7 +++---- pkg/utils/keypath/key_path_v2.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 5 deletions(-) create mode 100644 pkg/utils/keypath/key_path_v2.go diff --git a/pkg/member/member.go b/pkg/member/member.go index 2dc8be52031..ebc6889bc2c 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" @@ -161,7 +162,7 @@ func (m *EmbeddedEtcdMember) EnableLeader() { // GetLeaderPath returns the path of the PD leader. func (m *EmbeddedEtcdMember) GetLeaderPath() string { - return path.Join(m.rootPath, "leader") + return keypath.GetLeaderPath("") } // GetLeadership returns the leadership of the PD member. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 599e56387d6..924f76598b9 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -55,7 +56,6 @@ type Participant struct { leader atomic.Value client *clientv3.Client rootPath string - leaderPath string member participant serviceName string // memberValue is the serialized string of `member`. It will be saved in the @@ -89,10 +89,9 @@ func (m *Participant) InitInfo(p participant, rootPath string, leaderName string m.member = p m.memberValue = string(data) m.rootPath = rootPath - m.leaderPath = path.Join(rootPath, leaderName) m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose) m.lastLeaderUpdatedTime.Store(time.Now()) - log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.leaderPath)) + log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.GetLeaderPath())) } // ID returns the unique ID for this participant in the election group @@ -170,7 +169,7 @@ func (m *Participant) EnableLeader() { // GetLeaderPath returns the path of the leader. func (m *Participant) GetLeaderPath() string { - return m.leaderPath + return keypath.GetLeaderPath(m.serviceName) } // GetLastLeaderUpdatedTime returns the last time when the leader is updated. diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go new file mode 100644 index 00000000000..230ff356a53 --- /dev/null +++ b/pkg/utils/keypath/key_path_v2.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import "fmt" + +const ( + // "/pd/{cluster_id}/leader" + leaderPathFormat = "/pd/%d/leader" + + // "/ms/{cluster_id}/{service_name}/primary" + msLeaderPathFormat = "/ms/%d/%s/primary" +) + +func GetLeaderPath(serviceName string) string { + if serviceName == "" { + return fmt.Sprintf(leaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msLeaderPathFormat, ClusterID(), serviceName) +} From b88e95d9ffc69ec6e36726163a1c55c758e726ea Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 16:34:51 +0800 Subject: [PATCH 02/11] update dcLocationPath Signed-off-by: okJiang <819421878@qq.com> --- pkg/member/member.go | 9 ++++----- pkg/member/participant.go | 6 +++--- pkg/utils/keypath/key_path_v2.go | 26 ++++++++++++++++++++------ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/member/member.go b/pkg/member/member.go index ebc6889bc2c..68e76a95cdc 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -42,8 +42,7 @@ import ( const ( // The timeout to wait transfer etcd leader to complete. - moveLeaderTimeout = 5 * time.Second - dcLocationConfigEtcdPrefix = "dc-location" + moveLeaderTimeout = 5 * time.Second // If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again. campaignLeaderFrequencyTimes = 3 ) @@ -162,7 +161,7 @@ func (m *EmbeddedEtcdMember) EnableLeader() { // GetLeaderPath returns the path of the PD leader. func (m *EmbeddedEtcdMember) GetLeaderPath() string { - return keypath.GetLeaderPath("") + return keypath.LeaderPath("") } // GetLeadership returns the leadership of the PD member. @@ -386,12 +385,12 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string { // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. func (m *EmbeddedEtcdMember) GetDCLocationPathPrefix() string { - return path.Join(m.rootPath, dcLocationConfigEtcdPrefix) + return keypath.Prefix(keypath.DCLocationPath("", 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. func (m *EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { - return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id)) + return keypath.DCLocationPath("", id) } // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 924f76598b9..4d7a6fb7586 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -169,7 +169,7 @@ func (m *Participant) EnableLeader() { // GetLeaderPath returns the path of the leader. func (m *Participant) GetLeaderPath() string { - return keypath.GetLeaderPath(m.serviceName) + return keypath.LeaderPath(m.serviceName) } // GetLastLeaderUpdatedTime returns the last time when the leader is updated. @@ -291,12 +291,12 @@ func (m *Participant) getLeaderPriorityPath(id uint64) string { // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. func (m *Participant) GetDCLocationPathPrefix() string { - return path.Join(m.rootPath, dcLocationConfigEtcdPrefix) + return keypath.Prefix(keypath.DCLocationPath(m.serviceName, 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. func (m *Participant) GetDCLocationPath(id uint64) string { - return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id)) + return keypath.DCLocationPath(m.serviceName, id) } // SetLeaderPriority saves the priority to be elected as the etcd leader. diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index 230ff356a53..a2dd13a3545 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -14,19 +14,33 @@ package keypath -import "fmt" +import ( + "fmt" + "path" +) const ( - // "/pd/{cluster_id}/leader" - leaderPathFormat = "/pd/%d/leader" + leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + dcLocationPathFormat = "/pd/%d/dc-location/%d" // "/pd/{cluster_id}/dc-location/{member_id}" - // "/ms/{cluster_id}/{service_name}/primary" - msLeaderPathFormat = "/ms/%d/%s/primary" + msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" + msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" ) -func GetLeaderPath(serviceName string) string { +func Prefix(str string) string { + return path.Dir(str) +} + +func LeaderPath(serviceName string) string { if serviceName == "" { return fmt.Sprintf(leaderPathFormat, ClusterID()) } return fmt.Sprintf(msLeaderPathFormat, ClusterID(), serviceName) } + +func DCLocationPath(serviceName string, id uint64) string { + if serviceName == "" { + return fmt.Sprintf(dcLocationPathFormat, ClusterID(), id) + } + return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), serviceName, id) +} From 2725391d0dbfd879a48196b7ed96f847bf70e8cc Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 16:40:00 +0800 Subject: [PATCH 03/11] update member path Signed-off-by: okJiang <819421878@qq.com> --- pkg/member/member.go | 24 ++++++------------------ pkg/utils/keypath/key_path_v2.go | 19 +++++++++++++++++-- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/member/member.go b/pkg/member/member.go index 68e76a95cdc..e561fdb647e 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -452,13 +452,9 @@ func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) { return int(priority), nil } -func (m *EmbeddedEtcdMember) getMemberBinaryDeployPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/deploy_path", id)) -} - // GetMemberDeployPath loads a member's binary deploy path. func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) { - key := m.getMemberBinaryDeployPath(id) + key := keypath.MemberBinaryDeployPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -471,7 +467,7 @@ func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) { // SetMemberDeployPath saves a member's binary deploy path. func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error { - key := m.getMemberBinaryDeployPath(id) + key := keypath.MemberBinaryDeployPath(id) txn := kv.NewSlowLogTxn(m.client) execPath, err := os.Executable() deployPath := filepath.Dir(execPath) @@ -488,17 +484,9 @@ func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error { return nil } -func (m *EmbeddedEtcdMember) getMemberGitHashPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/git_hash", id)) -} - -func (m *EmbeddedEtcdMember) getMemberBinaryVersionPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/binary_version", id)) -} - // GetMemberBinaryVersion loads a member's binary version. func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) { - key := m.getMemberBinaryVersionPath(id) + key := keypath.MemberBinaryVersionPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -511,7 +499,7 @@ func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) { // GetMemberGitHash loads a member's git hash. func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) { - key := m.getMemberGitHashPath(id) + key := keypath.MemberGitHashPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -524,7 +512,7 @@ func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) { // SetMemberBinaryVersion saves a member's binary version. func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion string) error { - key := m.getMemberBinaryVersionPath(id) + key := keypath.MemberBinaryVersionPath(id) txn := kv.NewSlowLogTxn(m.client) res, err := txn.Then(clientv3.OpPut(key, releaseVersion)).Commit() if err != nil { @@ -538,7 +526,7 @@ func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion st // SetMemberGitHash saves a member's git hash. func (m *EmbeddedEtcdMember) SetMemberGitHash(id uint64, gitHash string) error { - key := m.getMemberGitHashPath(id) + key := keypath.MemberGitHashPath(id) txn := kv.NewSlowLogTxn(m.client) res, err := txn.Then(clientv3.OpPut(key, gitHash)).Commit() if err != nil { diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index a2dd13a3545..edf8906bc25 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -20,8 +20,11 @@ import ( ) const ( - leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" - dcLocationPathFormat = "/pd/%d/dc-location/%d" // "/pd/{cluster_id}/dc-location/{member_id}" + leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + dcLocationPathFormat = "/pd/%d/dc-location/%d" // "/pd/{cluster_id}/dc-location/{member_id}" + memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" + memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" + memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" @@ -44,3 +47,15 @@ func DCLocationPath(serviceName string, id uint64) string { } return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), serviceName, id) } + +func MemberBinaryDeployPath(id uint64) string { + return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) +} + +func MemberGitHashPath(id uint64) string { + return fmt.Sprintf(memberGitHashPath, ClusterID(), id) +} + +func MemberBinaryVersionPath(id uint64) string { + return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) +} From 1b36f7738b5433317b72364f21cfc3429168088c Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 17:25:54 +0800 Subject: [PATCH 04/11] update allocid path Signed-off-by: okJiang <819421878@qq.com> --- pkg/id/id.go | 59 ++++++++++++++++---------------- pkg/id/id_test.go | 22 ++++++------ pkg/keyspace/keyspace.go | 2 -- pkg/utils/keypath/key_path.go | 7 ---- pkg/utils/keypath/key_path_v2.go | 10 ++++++ server/server.go | 21 ++++-------- 6 files changed, 57 insertions(+), 64 deletions(-) diff --git a/pkg/id/id.go b/pkg/id/id.go index 0225dedd4f1..f475415ed14 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -15,19 +15,25 @@ package id import ( - "path" - "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) +type label string + +const ( + DefaultLabel label = "idalloc" + KeyspaceLabel label = "keyspace-idAlloc" +) + // Allocator is the allocator to generate unique ID. type Allocator interface { // SetBase set base id @@ -48,13 +54,11 @@ type allocatorImpl struct { base uint64 end uint64 - client *clientv3.Client - rootPath string - allocPath string - label string - member string - step uint64 - metrics *metrics + client *clientv3.Client + label label + member string + step uint64 + metrics *metrics } // metrics is a collection of idAllocator's metrics. @@ -64,24 +68,20 @@ type metrics struct { // AllocatorParams are parameters needed to create a new ID Allocator. type AllocatorParams struct { - Client *clientv3.Client - RootPath string - AllocPath string // AllocPath specifies path to the persistent window boundary. - Label string // Label used to label metrics and logs. - Member string // Member value, used to check if current pd leader. - Step uint64 // Step size of each persistent window boundary increment, default 1000. + Client *clientv3.Client + Label label // Label used to label metrics and logs. + Member string // Member value, used to check if current pd leader. + Step uint64 // Step size of each persistent window boundary increment, default 1000. } // NewAllocator creates a new ID Allocator. func NewAllocator(params *AllocatorParams) Allocator { allocator := &allocatorImpl{ - client: params.Client, - rootPath: params.RootPath, - allocPath: params.AllocPath, - label: params.Label, - member: params.Member, - step: params.Step, - metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)}, + client: params.Client, + label: params.Label, + member: params.Member, + step: params.Step, + metrics: &metrics{idGauge: idGauge.WithLabelValues(string(params.Label))}, } if allocator.step == 0 { allocator.step = defaultAllocStep @@ -127,9 +127,14 @@ func (alloc *allocatorImpl) Rebase() error { } func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error { - key := alloc.getAllocIDPath() + var key string + if alloc.label == KeyspaceLabel { + key = keypath.KeyspaceAllocIDPath() + } else { + key = keypath.AllocIDPath() + } - leaderPath := path.Join(alloc.rootPath, "leader") + leaderPath := keypath.LeaderPath("") var ( cmps = []clientv3.Cmp{clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member)} end uint64 @@ -173,10 +178,6 @@ func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error { // please do not reorder the first field, it's need when getting the new-end // see: https://docs.pingcap.com/tidb/dev/pd-recover#get-allocated-id-from-pd-log log.Info("idAllocator allocates a new id", zap.Uint64("new-end", end), zap.Uint64("new-base", alloc.base), - zap.String("label", alloc.label), zap.Bool("check-curr-end", checkCurrEnd)) + zap.String("label", string(alloc.label)), zap.Bool("check-curr-end", checkCurrEnd)) return nil } - -func (alloc *allocatorImpl) getAllocIDPath() string { - return path.Join(alloc.rootPath, alloc.allocPath) -} diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index d46ac5a963e..8f676383938 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -16,7 +16,6 @@ package id import ( "context" - "strconv" "sync" "testing" @@ -25,10 +24,8 @@ import ( ) const ( - rootPath = "/pd" - leaderPath = "/pd/leader" + leaderPath = "/pd/0/leader" allocPath = "alloc_id" - label = "idalloc" memberVal = "member" step = uint64(500) ) @@ -44,24 +41,25 @@ func TestMultipleAllocator(t *testing.T) { _, err := client.Put(context.Background(), leaderPath, memberVal) re.NoError(err) + var i uint64 wg := sync.WaitGroup{} - for i := range 3 { - iStr := strconv.Itoa(i) + fn := func(label label) { wg.Add(1) // All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps. allocator := NewAllocator(&AllocatorParams{ - Client: client, - RootPath: rootPath, - AllocPath: allocPath + iStr, - Label: label + iStr, - Member: memberVal, - Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively. + Client: client, + Label: label, + Member: memberVal, + Step: step * i, // allocator 0, 1 should have step size 1000 (default), 500 respectively. }) go func(re *require.Assertions, allocator Allocator) { defer wg.Done() testAllocator(re, allocator) }(re, allocator) + i++ } + fn(DefaultLabel) + fn(KeyspaceLabel) wg.Wait() } diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index afb60d7bb3f..08bb51da936 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -42,8 +42,6 @@ const ( // AllocStep set idAllocator's step when write persistent window boundary. // Use a lower value for denser idAllocation in the event of frequent pd leader change. AllocStep = uint64(100) - // AllocLabel is used to label keyspace idAllocator's metrics. - AllocLabel = "keyspace-idAlloc" // regionLabelIDPrefix is used to prefix the keyspace region label. regionLabelIDPrefix = "keyspaces/" // regionLabelKey is the key for keyspace id in keyspace region label. diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 5f3aafeca36..628071e5409 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -53,7 +53,6 @@ const ( keyspacePrefix = "keyspaces" keyspaceMetaInfix = "meta" keyspaceIDInfix = "id" - keyspaceAllocID = "alloc_id" gcSafePointInfix = "gc_safe_point" serviceSafePointInfix = "service_safe_point" regionPathPrefix = "raft/r" @@ -285,12 +284,6 @@ func KeyspaceIDPath(name string) string { return path.Join(keyspacePrefix, keyspaceIDInfix, name) } -// KeyspaceIDAlloc returns the path of the keyspace id's persistent window boundary. -// Path: keyspaces/alloc_id -func KeyspaceIDAlloc() string { - return path.Join(keyspacePrefix, keyspaceAllocID) -} - // EncodeKeyspaceID from uint32 to string. // It adds extra padding to make encoded ID ordered. // Encoded ID can be decoded directly with strconv.ParseUint. diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index edf8906bc25..68a1fea75dd 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -25,6 +25,8 @@ const ( memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" + allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" + keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" @@ -59,3 +61,11 @@ func MemberGitHashPath(id uint64) string { func MemberBinaryVersionPath(id uint64) string { return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) } + +func AllocIDPath() string { + return fmt.Sprintf(allocIDPathFormat, ClusterID()) +} + +func KeyspaceAllocIDPath() string { + return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) +} diff --git a/server/server.go b/server/server.go index 029c85694c3..95e8b30b8ef 100644 --- a/server/server.go +++ b/server/server.go @@ -93,9 +93,6 @@ const ( // pdRootPath for all pd servers. pdRootPath = "/pd" pdAPIPrefix = "/pd/" - // idAllocPath for idAllocator to save persistent window's end. - idAllocPath = "alloc_id" - idAllocLabel = "idalloc" recoveringMarkPath = "cluster/markers/snapshot-recovering" @@ -446,11 +443,9 @@ func (s *Server) startServer(ctx context.Context) error { return err } s.idAllocator = id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - RootPath: s.rootPath, - AllocPath: idAllocPath, - Label: idAllocLabel, - Member: s.member.MemberValue(), + Client: s.client, + Label: id.DefaultLabel, + Member: s.member.MemberValue(), }) s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption) if err != nil { @@ -487,12 +482,10 @@ func (s *Server) startServer(ctx context.Context) error { s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - RootPath: s.rootPath, - AllocPath: keypath.KeyspaceIDAlloc(), - Label: keyspace.AllocLabel, - Member: s.member.MemberValue(), - Step: keyspace.AllocStep, + Client: s.client, + Label: id.KeyspaceLabel, + Member: s.member.MemberValue(), + Step: keyspace.AllocStep, }) if s.IsAPIServiceMode() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) From 816dc15c1fe24c5d3acce96926cac2d4ac3dc42a Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 17:32:02 +0800 Subject: [PATCH 05/11] update comment Signed-off-by: okJiang <819421878@qq.com> --- pkg/id/id_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index 8f676383938..e08c0e93367 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -25,7 +25,6 @@ import ( const ( leaderPath = "/pd/0/leader" - allocPath = "alloc_id" memberVal = "member" step = uint64(500) ) @@ -45,7 +44,7 @@ func TestMultipleAllocator(t *testing.T) { wg := sync.WaitGroup{} fn := func(label label) { wg.Add(1) - // All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps. + // Different allocators have different labels and steps. allocator := NewAllocator(&AllocatorParams{ Client: client, Label: label, From 78d5b331987d5edf27e6a8af7b7e8b5b62345738 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 17:37:49 +0800 Subject: [PATCH 06/11] fix lint Signed-off-by: okJiang <819421878@qq.com> --- pkg/utils/keypath/key_path_v2.go | 8 ++++++++ server/server.go | 14 +++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index 68a1fea75dd..3b3e4bc0356 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -32,10 +32,12 @@ const ( msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" ) +// Prefix returns the parent directory of the given path. func Prefix(str string) string { return path.Dir(str) } +// LeaderPath returns the leader path. func LeaderPath(serviceName string) string { if serviceName == "" { return fmt.Sprintf(leaderPathFormat, ClusterID()) @@ -43,6 +45,7 @@ func LeaderPath(serviceName string) string { return fmt.Sprintf(msLeaderPathFormat, ClusterID(), serviceName) } +// DCLocationPath returns the dc-location path. func DCLocationPath(serviceName string, id uint64) string { if serviceName == "" { return fmt.Sprintf(dcLocationPathFormat, ClusterID(), id) @@ -50,22 +53,27 @@ func DCLocationPath(serviceName string, id uint64) string { return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), serviceName, id) } +// MemberBinaryDeployPath returns the member binary deploy path. func MemberBinaryDeployPath(id uint64) string { return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) } +// MemberGitHashPath returns the member git hash path. func MemberGitHashPath(id uint64) string { return fmt.Sprintf(memberGitHashPath, ClusterID(), id) } +// MemberBinaryVersionPath returns the member binary version path. func MemberBinaryVersionPath(id uint64) string { return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) } +// AllocIDPath returns the alloc id path. func AllocIDPath() string { return fmt.Sprintf(allocIDPathFormat, ClusterID()) } +// KeyspaceAllocIDPath returns the keyspace alloc id path. func KeyspaceAllocIDPath() string { return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) } diff --git a/server/server.go b/server/server.go index 95e8b30b8ef..ed61b14834f 100644 --- a/server/server.go +++ b/server/server.go @@ -443,9 +443,9 @@ func (s *Server) startServer(ctx context.Context) error { return err } s.idAllocator = id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - Label: id.DefaultLabel, - Member: s.member.MemberValue(), + Client: s.client, + Label: id.DefaultLabel, + Member: s.member.MemberValue(), }) s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption) if err != nil { @@ -482,10 +482,10 @@ func (s *Server) startServer(ctx context.Context) error { s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - Label: id.KeyspaceLabel, - Member: s.member.MemberValue(), - Step: keyspace.AllocStep, + Client: s.client, + Label: id.KeyspaceLabel, + Member: s.member.MemberValue(), + Step: keyspace.AllocStep, }) if s.IsAPIServiceMode() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) From cf5c88aaeef53b500a2e9387eafb3e1bbc9038ae Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 18:19:16 +0800 Subject: [PATCH 07/11] add keyspace group path Signed-off-by: okJiang <819421878@qq.com> --- pkg/id/id.go | 2 +- pkg/mcs/resourcemanager/server/server.go | 6 ++-- pkg/mcs/scheduling/server/server.go | 6 ++-- pkg/member/member.go | 12 ++++---- pkg/member/participant.go | 32 +++++++++---------- pkg/tso/keyspace_group_manager.go | 7 +++-- pkg/utils/keypath/key_path_v2.go | 39 ++++++++++++++++++------ 7 files changed, 66 insertions(+), 38 deletions(-) diff --git a/pkg/id/id.go b/pkg/id/id.go index f475415ed14..bda4b3a8000 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -134,7 +134,7 @@ func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error { key = keypath.AllocIDPath() } - leaderPath := keypath.LeaderPath("") + leaderPath := keypath.LeaderPath(nil) var ( cmps = []clientv3.Cmp{clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member)} end uint64 diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 05b01094801..4514503edea 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -303,13 +303,15 @@ func (s *Server) startServer() (err error) { uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) - s.participant = member.NewParticipant(s.GetClient(), constant.ResourceManagerServiceName) + s.participant = member.NewParticipant(s.GetClient(), keypath.MsParam{ + ServiceName: constant.ResourceManagerServiceName, + }) p := &resource_manager.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(), constant.PrimaryKey, "primary election") + s.participant.InitInfo(p, keypath.ResourceManagerSvcRootPath(), "primary election") s.service = &Service{ ctx: s.Context(), diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index a613b54b82d..f615a78fe05 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -452,13 +452,15 @@ func (s *Server) startServer() (err error) { uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) - s.participant = member.NewParticipant(s.GetClient(), constant.SchedulingServiceName) + s.participant = member.NewParticipant(s.GetClient(), keypath.MsParam{ + ServiceName: constant.SchedulingServiceName, + }) p := &schedulingpb.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), constant.PrimaryKey, "primary election") + s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election") s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) diff --git a/pkg/member/member.go b/pkg/member/member.go index e561fdb647e..89ba59d385f 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -160,8 +160,8 @@ func (m *EmbeddedEtcdMember) EnableLeader() { } // GetLeaderPath returns the path of the PD leader. -func (m *EmbeddedEtcdMember) GetLeaderPath() string { - return keypath.LeaderPath("") +func (*EmbeddedEtcdMember) GetLeaderPath() string { + return keypath.LeaderPath(nil) } // GetLeadership returns the leadership of the PD member. @@ -384,13 +384,13 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string { } // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. -func (m *EmbeddedEtcdMember) GetDCLocationPathPrefix() string { - return keypath.Prefix(keypath.DCLocationPath("", 0)) +func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string { + return keypath.Prefix(keypath.DCLocationPath(nil, 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. -func (m *EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { - return keypath.DCLocationPath("", id) +func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { + return keypath.DCLocationPath(nil, id) } // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 4d7a6fb7586..52fe696bafa 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -51,13 +51,13 @@ type participant interface { // EmbeddedEtcdMember, Participant relies on etcd for election, but it's decoupled // from the embedded etcd. It implements Member interface. type Participant struct { + keypath.MsParam leadership *election.Leadership // stored as member type - leader atomic.Value - client *clientv3.Client - rootPath string - member participant - serviceName string + leader atomic.Value + client *clientv3.Client + rootPath string + member participant // memberValue is the serialized string of `member`. It will be saved in the // leader key when this participant is successfully elected as the leader of // the group. Every write will use it to check the leadership. @@ -72,15 +72,15 @@ type Participant struct { } // NewParticipant create a new Participant. -func NewParticipant(client *clientv3.Client, serviceName string) *Participant { +func NewParticipant(client *clientv3.Client, msParam keypath.MsParam) *Participant { return &Participant{ - client: client, - serviceName: serviceName, + MsParam: msParam, + client: client, } } -// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName) -func (m *Participant) InitInfo(p participant, rootPath string, leaderName string, purpose string) { +// InitInfo initializes the member info. +func (m *Participant) InitInfo(p participant, rootPath string, purpose string) { data, err := p.Marshal() if err != nil { // can't fail, so panic here. @@ -144,7 +144,7 @@ func (m *Participant) GetLeaderID() uint64 { func (m *Participant) GetLeader() participant { leader := m.leader.Load() if leader == nil { - return NewParticipantByService(m.serviceName) + return NewParticipantByService(m.ServiceName) } return leader.(participant) } @@ -157,7 +157,7 @@ func (m *Participant) setLeader(member participant) { // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { - leader := NewParticipantByService(m.serviceName) + leader := NewParticipantByService(m.ServiceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -169,7 +169,7 @@ func (m *Participant) EnableLeader() { // GetLeaderPath returns the path of the leader. func (m *Participant) GetLeaderPath() string { - return keypath.LeaderPath(m.serviceName) + return keypath.LeaderPath(&m.MsParam) } // GetLastLeaderUpdatedTime returns the last time when the leader is updated. @@ -208,7 +208,7 @@ func (*Participant) PreCheckLeader() error { // getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). func (m *Participant) getPersistentLeader() (participant, int64, error) { - leader := NewParticipantByService(m.serviceName) + leader := NewParticipantByService(m.ServiceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { return nil, 0, err @@ -291,12 +291,12 @@ func (m *Participant) getLeaderPriorityPath(id uint64) string { // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. func (m *Participant) GetDCLocationPathPrefix() string { - return keypath.Prefix(keypath.DCLocationPath(m.serviceName, 0)) + return keypath.Prefix(keypath.DCLocationPath(&m.MsParam, 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. func (m *Participant) GetDCLocationPath(id uint64) string { - return keypath.DCLocationPath(m.serviceName, id) + return keypath.DCLocationPath(&m.MsParam, id) } // SetLeaderPriority saves the priority to be elected as the etcd leader. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c49153814d..07731204e3d 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -751,13 +751,16 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) // Initialize the participant info to join the primary election. - participant := member.NewParticipant(kgm.etcdClient, constant.TSOServiceName) + participant := member.NewParticipant(kgm.etcdClient, keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: group.ID, + }) p := &tsopb.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()}, } - participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), constant.PrimaryKey, "keyspace group primary election") + participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), "keyspace group primary election") // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index 3b3e4bc0356..52b4f078375 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -28,29 +28,50 @@ const ( allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" - msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" - msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" + msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" + msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" + msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" + msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" + msTsoDefaultDCLocationPath = "/ms/%d/tso/00000/dc-location/%d" // "/ms/{cluster_id}/tso/00000/dc-location/{member_id}" + msTsoKespaceDCLocationPath = "/ms/%d/tso/keyspace_groups/election/%05d/dc-location/%d" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/dc-location/{member_id}" ) +type MsParam struct { + ServiceName string + GroupID uint32 // only used for tso keyspace group +} + // Prefix returns the parent directory of the given path. func Prefix(str string) string { return path.Dir(str) } // LeaderPath returns the leader path. -func LeaderPath(serviceName string) string { - if serviceName == "" { +func LeaderPath(p *MsParam) string { + if p == nil || p.ServiceName == "" { return fmt.Sprintf(leaderPathFormat, ClusterID()) } - return fmt.Sprintf(msLeaderPathFormat, ClusterID(), serviceName) + if p.ServiceName == "tso" { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msTsoKespaceLeaderPathFormat, ClusterID(), p.GroupID) + } + return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) } // DCLocationPath returns the dc-location path. -func DCLocationPath(serviceName string, id uint64) string { - if serviceName == "" { - return fmt.Sprintf(dcLocationPathFormat, ClusterID(), id) +func DCLocationPath(p *MsParam, memberID uint64) string { + if p == nil || p.ServiceName == "" { + return fmt.Sprintf(dcLocationPathFormat, ClusterID(), memberID) + } + if p.ServiceName == "tso" { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultDCLocationPath, ClusterID(), memberID) + } + return fmt.Sprintf(msTsoKespaceDCLocationPath, ClusterID(), p.GroupID, memberID) } - return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), serviceName, id) + return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), p.ServiceName, memberID) } // MemberBinaryDeployPath returns the member binary deploy path. From a58fab321bb9b9964ceb6469dfaadebe6e6044ec Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 7 Nov 2024 18:21:37 +0800 Subject: [PATCH 08/11] add comment Signed-off-by: okJiang <819421878@qq.com> --- pkg/id/id.go | 4 +++- pkg/utils/keypath/key_path_v2.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/id/id.go b/pkg/id/id.go index bda4b3a8000..eb2788fc656 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -30,7 +30,9 @@ import ( type label string const ( - DefaultLabel label = "idalloc" + // DefaultLabel is the default label for id allocator. + DefaultLabel label = "idalloc" + // KeyspaceLabel is the label for keyspace id allocator. KeyspaceLabel label = "keyspace-idAlloc" ) diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go index 52b4f078375..395e311e64c 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/key_path_v2.go @@ -36,6 +36,7 @@ const ( msTsoKespaceDCLocationPath = "/ms/%d/tso/keyspace_groups/election/%05d/dc-location/%d" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/dc-location/{member_id}" ) +// MsParam is the parameter of micro service. type MsParam struct { ServiceName string GroupID uint32 // only used for tso keyspace group From ee91def85e74e481a84f8aaa29be720cd15d320b Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 14 Nov 2024 16:25:48 +0800 Subject: [PATCH 09/11] add a real cluster test Signed-off-by: okJiang <819421878@qq.com> --- tests/integrations/realcluster/cluster.go | 3 + .../realcluster/cluster_id_test.go | 5 +- .../download_integration_test_binaries.sh | 8 ++ .../integrations/realcluster/etcd_key_test.go | 128 ++++++++++++++++++ tests/integrations/realcluster/util.go | 11 ++ 5 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 tests/integrations/realcluster/etcd_key_test.go diff --git a/tests/integrations/realcluster/cluster.go b/tests/integrations/realcluster/cluster.go index 4dc80b5d433..9682a159342 100644 --- a/tests/integrations/realcluster/cluster.go +++ b/tests/integrations/realcluster/cluster.go @@ -87,6 +87,9 @@ func (s *clusterSuite) stopCluster(t *testing.T) { } func (s *clusterSuite) tag() string { + if s.ms { + return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, s.clusterCnt) + } return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) } diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index e327f472bbb..1d04bd5cc51 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -16,7 +16,6 @@ package realcluster import ( "context" - "os/exec" "strings" "testing" @@ -55,10 +54,8 @@ func (s *clusterIDSuite) TestClientClusterID() { } func getPDEndpoints(t *testing.T) []string { - cmd := exec.Command("sh", "-c", "ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") - bytes, err := cmd.Output() + pdAddrsForEachTikv, err := runCommandWithOutput("ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") require.NoError(t, err) - pdAddrsForEachTikv := strings.Split(string(bytes), "\n") var pdAddrs []string for _, addr := range pdAddrsForEachTikv { // length of addr is less than 5 means it must not be a valid address diff --git a/tests/integrations/realcluster/download_integration_test_binaries.sh b/tests/integrations/realcluster/download_integration_test_binaries.sh index 8d4cc3411a8..26e09a2347a 100644 --- a/tests/integrations/realcluster/download_integration_test_binaries.sh +++ b/tests/integrations/realcluster/download_integration_test_binaries.sh @@ -28,6 +28,8 @@ tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/ tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" +ETCD_VERSION=v3.5.9 + set -o nounset # See https://misc.flogisoft.com/bash/tip_colors_and_formatting. @@ -69,6 +71,12 @@ function main() { mv third_bin/tiflash third_bin/_tiflash mv third_bin/_tiflash/* third_bin && rm -rf third_bin/_tiflash + # etcdctl + curl -L https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-linux-amd64.tar.gz -o etcd-${ETCD_VERSION}-linux-amd64.tar.gz + tar -xvf etcd-${ETCD_VERSION}-linux-amd64.tar.gz + sudo mv etcd-${ETCD_VERSION}-linux-amd64/etcdctl /usr/local/bin/ + etcdctl version + chmod +x third_bin/* rm -rf tmp rm -rf third_bin/bin diff --git a/tests/integrations/realcluster/etcd_key_test.go b/tests/integrations/realcluster/etcd_key_test.go new file mode 100644 index 00000000000..4b822ab4956 --- /dev/null +++ b/tests/integrations/realcluster/etcd_key_test.go @@ -0,0 +1,128 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/utils/testutil" +) + +type etcdKeySuite struct { + clusterSuite +} + +func TestEtcdKey(t *testing.T) { + suite.Run(t, &etcdKeySuite{ + clusterSuite: clusterSuite{ + suiteName: "etcd_key", + }, + }) +} + +func TestMSEtcdKey(t *testing.T) { + suite.Run(t, &etcdKeySuite{ + clusterSuite: clusterSuite{ + suiteName: "etcd_key", + ms: true, + }, + }) +} + +var ( + pdKeys = []string{ + "", + "/pd//alloc_id", + "/pd//config", + "/pd//gc/safe_point", + "/pd//gc/safe_point/service/gc_worker", + "/pd//keyspaces/id/DEFAULT", + "/pd//keyspaces/meta/", + "/pd//leader", + "/pd//member//binary_version", + "/pd//member//deploy_path", + "/pd//member//git_hash", + "/pd//raft", + "/pd//raft/min_resolved_ts", + "/pd//raft/r/", + "/pd//raft/s/", + "/pd//raft/status/raft_bootstrap_time", + "/pd//region_label/keyspaces/", + "/pd//rule_group/tiflash", + "/pd//rules/-c", + "/pd//scheduler_config/balance-hot-region-scheduler", + "/pd//scheduler_config/balance-leader-scheduler", + "/pd//scheduler_config/balance-region-scheduler", + "/pd//scheduler_config/evict-slow-store-scheduler", + "/pd//timestamp", + "/pd//tso/keyspace_groups/membership/", + "/pd/cluster_id", + } + msKeys = []string{ + "", + "/ms//scheduling/primary", + "/ms//scheduling/primary/expected_primary", + "/ms//scheduling/registry/http://...:", + "/ms//tso//primary", + "/ms//tso//primary/expected_primary", + "/ms//tso/registry/http://...:", + } +) + +func (s *etcdKeySuite) TestEtcdKey() { + t := s.T() + endpoints := getPDEndpoints(t) + + testutil.Eventually(require.New(t), func() bool { + keys, err := getEtcdKey(endpoints[0], "/pd") + if err != nil { + return false + } + return checkEtcdKey(t, keys, pdKeys) + }) + + if s.ms { + testutil.Eventually(require.New(t), func() bool { + keys, err := getEtcdKey(endpoints[0], "/ms") + if err != nil { + return false + } + return checkEtcdKey(t, keys, msKeys) + }) + } +} + +func getEtcdKey(endpoints, prefix string) ([]string, error) { + // `sed 's/[0-9]*//g'` is used to remove the number in the etcd key, such as the cluster id. + etcdCmd := fmt.Sprintf("etcdctl --endpoints=%s get %s --prefix --keys-only | sed 's/[0-9]*//g' | sort | uniq", + endpoints, prefix) + return runCommandWithOutput(etcdCmd) +} + +func checkEtcdKey(t *testing.T, keys, expectedKeys []string) bool { + for i, key := range keys { + if len(key) == 0 { + continue + } + if expectedKeys[i] != key { + t.Logf("expected key: %s, got key: %s", expectedKeys[i], key) + return false + } + } + return true +} diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index 789ceaa29c2..a33f9c071c6 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -17,6 +17,7 @@ package realcluster import ( "os" "os/exec" + "strings" "time" ) @@ -40,6 +41,16 @@ func runCommand(name string, args ...string) error { return cmd.Run() } +func runCommandWithOutput(cmdStr string) ([]string, error) { + cmd := exec.Command("sh", "-c", cmdStr) + bytes, err := cmd.Output() + if err != nil { + return nil, err + } + output := strings.Split(string(bytes), "\n") + return output, nil +} + func fileExists(path string) bool { _, err := os.Stat(path) return !os.IsNotExist(err) From 44bb9f300ac28aa8fd762575499674ad3a77c5a1 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 14 Nov 2024 18:22:36 +0800 Subject: [PATCH 10/11] fix test Signed-off-by: okJiang <819421878@qq.com> --- tests/integrations/realcluster/cluster.go | 3 ++- .../integrations/realcluster/etcd_key_test.go | 22 +++++++++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/tests/integrations/realcluster/cluster.go b/tests/integrations/realcluster/cluster.go index 9682a159342..feabbfb17e0 100644 --- a/tests/integrations/realcluster/cluster.go +++ b/tests/integrations/realcluster/cluster.go @@ -184,5 +184,6 @@ func waitTiupReady(t *testing.T, tag string) { zap.String("tag", tag), zap.Error(err)) time.Sleep(time.Duration(interval) * time.Second) } - require.Failf(t, "TiUP is not ready", "tag: %s", tag) + // this check can trigger the cleanup function + require.NotZero(t, 1, "TiUP is not ready", "tag: %s", tag) } diff --git a/tests/integrations/realcluster/etcd_key_test.go b/tests/integrations/realcluster/etcd_key_test.go index 4b822ab4956..09721a71a79 100644 --- a/tests/integrations/realcluster/etcd_key_test.go +++ b/tests/integrations/realcluster/etcd_key_test.go @@ -16,6 +16,7 @@ package realcluster import ( "fmt" + "slices" "testing" "github.com/stretchr/testify/require" @@ -45,11 +46,13 @@ func TestMSEtcdKey(t *testing.T) { } var ( + // The keys that prefix is `/pd`. pdKeys = []string{ "", "/pd//alloc_id", "/pd//config", - "/pd//gc/safe_point", + // If not call `UpdateGCSafePoint`, this key will not exist. + // "/pd//gc/safe_point", "/pd//gc/safe_point/service/gc_worker", "/pd//keyspaces/id/DEFAULT", "/pd//keyspaces/meta/", @@ -70,9 +73,10 @@ var ( "/pd//scheduler_config/balance-region-scheduler", "/pd//scheduler_config/evict-slow-store-scheduler", "/pd//timestamp", - "/pd//tso/keyspace_groups/membership/", + "/pd//tso/keyspace_groups/membership/", // ms "/pd/cluster_id", } + // The keys that prefix is `/ms`. msKeys = []string{ "", "/ms//scheduling/primary", @@ -82,9 +86,23 @@ var ( "/ms//tso//primary/expected_primary", "/ms//tso/registry/http://...:", } + // These keys with `/pd` are only in `ms` mode. + pdMSKeys = []string{ + "/pd//tso/keyspace_groups/membership/", + } ) func (s *etcdKeySuite) TestEtcdKey() { + var keysBackup []string + if !s.ms { + keysBackup = pdKeys + pdKeys = slices.DeleteFunc(pdKeys, func(s string) bool { + return slices.Contains(pdMSKeys, s) + }) + defer func() { + pdKeys = keysBackup + }() + } t := s.T() endpoints := getPDEndpoints(t) From f1d54d34fd1883315778423b170fbd5628f511f1 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 15 Nov 2024 17:27:27 +0800 Subject: [PATCH 11/11] update etcd version Signed-off-by: okJiang <819421878@qq.com> --- .../realcluster/download_integration_test_binaries.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/realcluster/download_integration_test_binaries.sh b/tests/integrations/realcluster/download_integration_test_binaries.sh index 26e09a2347a..9009750ca99 100644 --- a/tests/integrations/realcluster/download_integration_test_binaries.sh +++ b/tests/integrations/realcluster/download_integration_test_binaries.sh @@ -28,7 +28,7 @@ tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/ tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" -ETCD_VERSION=v3.5.9 +ETCD_VERSION=v3.5.15 set -o nounset