diff --git a/pkg/id/id.go b/pkg/id/id.go index 0225dedd4f1..eb2788fc656 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -15,19 +15,27 @@ package id import ( - "path" - "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) +type label string + +const ( + // DefaultLabel is the default label for id allocator. + DefaultLabel label = "idalloc" + // KeyspaceLabel is the label for keyspace id allocator. + KeyspaceLabel label = "keyspace-idAlloc" +) + // Allocator is the allocator to generate unique ID. type Allocator interface { // SetBase set base id @@ -48,13 +56,11 @@ type allocatorImpl struct { base uint64 end uint64 - client *clientv3.Client - rootPath string - allocPath string - label string - member string - step uint64 - metrics *metrics + client *clientv3.Client + label label + member string + step uint64 + metrics *metrics } // metrics is a collection of idAllocator's metrics. @@ -64,24 +70,20 @@ type metrics struct { // AllocatorParams are parameters needed to create a new ID Allocator. type AllocatorParams struct { - Client *clientv3.Client - RootPath string - AllocPath string // AllocPath specifies path to the persistent window boundary. - Label string // Label used to label metrics and logs. - Member string // Member value, used to check if current pd leader. - Step uint64 // Step size of each persistent window boundary increment, default 1000. + Client *clientv3.Client + Label label // Label used to label metrics and logs. + Member string // Member value, used to check if current pd leader. + Step uint64 // Step size of each persistent window boundary increment, default 1000. } // NewAllocator creates a new ID Allocator. func NewAllocator(params *AllocatorParams) Allocator { allocator := &allocatorImpl{ - client: params.Client, - rootPath: params.RootPath, - allocPath: params.AllocPath, - label: params.Label, - member: params.Member, - step: params.Step, - metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)}, + client: params.Client, + label: params.Label, + member: params.Member, + step: params.Step, + metrics: &metrics{idGauge: idGauge.WithLabelValues(string(params.Label))}, } if allocator.step == 0 { allocator.step = defaultAllocStep @@ -127,9 +129,14 @@ func (alloc *allocatorImpl) Rebase() error { } func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error { - key := alloc.getAllocIDPath() + var key string + if alloc.label == KeyspaceLabel { + key = keypath.KeyspaceAllocIDPath() + } else { + key = keypath.AllocIDPath() + } - leaderPath := path.Join(alloc.rootPath, "leader") + leaderPath := keypath.LeaderPath(nil) var ( cmps = []clientv3.Cmp{clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member)} end uint64 @@ -173,10 +180,6 @@ func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error { // please do not reorder the first field, it's need when getting the new-end // see: https://docs.pingcap.com/tidb/dev/pd-recover#get-allocated-id-from-pd-log log.Info("idAllocator allocates a new id", zap.Uint64("new-end", end), zap.Uint64("new-base", alloc.base), - zap.String("label", alloc.label), zap.Bool("check-curr-end", checkCurrEnd)) + zap.String("label", string(alloc.label)), zap.Bool("check-curr-end", checkCurrEnd)) return nil } - -func (alloc *allocatorImpl) getAllocIDPath() string { - return path.Join(alloc.rootPath, alloc.allocPath) -} diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index d46ac5a963e..e08c0e93367 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -16,7 +16,6 @@ package id import ( "context" - "strconv" "sync" "testing" @@ -25,10 +24,7 @@ import ( ) const ( - rootPath = "/pd" - leaderPath = "/pd/leader" - allocPath = "alloc_id" - label = "idalloc" + leaderPath = "/pd/0/leader" memberVal = "member" step = uint64(500) ) @@ -44,24 +40,25 @@ func TestMultipleAllocator(t *testing.T) { _, err := client.Put(context.Background(), leaderPath, memberVal) re.NoError(err) + var i uint64 wg := sync.WaitGroup{} - for i := range 3 { - iStr := strconv.Itoa(i) + fn := func(label label) { wg.Add(1) - // All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps. + // Different allocators have different labels and steps. allocator := NewAllocator(&AllocatorParams{ - Client: client, - RootPath: rootPath, - AllocPath: allocPath + iStr, - Label: label + iStr, - Member: memberVal, - Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively. + Client: client, + Label: label, + Member: memberVal, + Step: step * i, // allocator 0, 1 should have step size 1000 (default), 500 respectively. }) go func(re *require.Assertions, allocator Allocator) { defer wg.Done() testAllocator(re, allocator) }(re, allocator) + i++ } + fn(DefaultLabel) + fn(KeyspaceLabel) wg.Wait() } diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index afb60d7bb3f..08bb51da936 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -42,8 +42,6 @@ const ( // AllocStep set idAllocator's step when write persistent window boundary. // Use a lower value for denser idAllocation in the event of frequent pd leader change. AllocStep = uint64(100) - // AllocLabel is used to label keyspace idAllocator's metrics. - AllocLabel = "keyspace-idAlloc" // regionLabelIDPrefix is used to prefix the keyspace region label. regionLabelIDPrefix = "keyspaces/" // regionLabelKey is the key for keyspace id in keyspace region label. diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index a613b54b82d..f615a78fe05 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -452,13 +452,15 @@ func (s *Server) startServer() (err error) { uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) - s.participant = member.NewParticipant(s.GetClient(), constant.SchedulingServiceName) + s.participant = member.NewParticipant(s.GetClient(), keypath.MsParam{ + ServiceName: constant.SchedulingServiceName, + }) p := &schedulingpb.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), constant.PrimaryKey, "primary election") + s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election") s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) diff --git a/pkg/member/member.go b/pkg/member/member.go index 2dc8be52031..89ba59d385f 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" @@ -41,8 +42,7 @@ import ( const ( // The timeout to wait transfer etcd leader to complete. - moveLeaderTimeout = 5 * time.Second - dcLocationConfigEtcdPrefix = "dc-location" + moveLeaderTimeout = 5 * time.Second // If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again. campaignLeaderFrequencyTimes = 3 ) @@ -160,8 +160,8 @@ func (m *EmbeddedEtcdMember) EnableLeader() { } // GetLeaderPath returns the path of the PD leader. -func (m *EmbeddedEtcdMember) GetLeaderPath() string { - return path.Join(m.rootPath, "leader") +func (*EmbeddedEtcdMember) GetLeaderPath() string { + return keypath.LeaderPath(nil) } // GetLeadership returns the leadership of the PD member. @@ -384,13 +384,13 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string { } // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. -func (m *EmbeddedEtcdMember) GetDCLocationPathPrefix() string { - return path.Join(m.rootPath, dcLocationConfigEtcdPrefix) +func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string { + return keypath.Prefix(keypath.DCLocationPath(nil, 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. -func (m *EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { - return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id)) +func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string { + return keypath.DCLocationPath(nil, id) } // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. @@ -452,13 +452,9 @@ func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) { return int(priority), nil } -func (m *EmbeddedEtcdMember) getMemberBinaryDeployPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/deploy_path", id)) -} - // GetMemberDeployPath loads a member's binary deploy path. func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) { - key := m.getMemberBinaryDeployPath(id) + key := keypath.MemberBinaryDeployPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -471,7 +467,7 @@ func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) { // SetMemberDeployPath saves a member's binary deploy path. func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error { - key := m.getMemberBinaryDeployPath(id) + key := keypath.MemberBinaryDeployPath(id) txn := kv.NewSlowLogTxn(m.client) execPath, err := os.Executable() deployPath := filepath.Dir(execPath) @@ -488,17 +484,9 @@ func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error { return nil } -func (m *EmbeddedEtcdMember) getMemberGitHashPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/git_hash", id)) -} - -func (m *EmbeddedEtcdMember) getMemberBinaryVersionPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/binary_version", id)) -} - // GetMemberBinaryVersion loads a member's binary version. func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) { - key := m.getMemberBinaryVersionPath(id) + key := keypath.MemberBinaryVersionPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -511,7 +499,7 @@ func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) { // GetMemberGitHash loads a member's git hash. func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) { - key := m.getMemberGitHashPath(id) + key := keypath.MemberGitHashPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return "", err @@ -524,7 +512,7 @@ func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) { // SetMemberBinaryVersion saves a member's binary version. func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion string) error { - key := m.getMemberBinaryVersionPath(id) + key := keypath.MemberBinaryVersionPath(id) txn := kv.NewSlowLogTxn(m.client) res, err := txn.Then(clientv3.OpPut(key, releaseVersion)).Commit() if err != nil { @@ -538,7 +526,7 @@ func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion st // SetMemberGitHash saves a member's git hash. func (m *EmbeddedEtcdMember) SetMemberGitHash(id uint64, gitHash string) error { - key := m.getMemberGitHashPath(id) + key := keypath.MemberGitHashPath(id) txn := kv.NewSlowLogTxn(m.client) res, err := txn.Then(clientv3.OpPut(key, gitHash)).Commit() if err != nil { diff --git a/pkg/member/participant.go b/pkg/member/participant.go index a513152d9b2..93d852f0fb0 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -16,8 +16,6 @@ package member import ( "context" - "fmt" - "path" "sync/atomic" "time" @@ -28,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -48,14 +47,13 @@ type participant interface { // EmbeddedEtcdMember, Participant relies on etcd for election, but it's decoupled // from the embedded etcd. It implements Member interface. type Participant struct { + keypath.MsParam leadership *election.Leadership // stored as member type - leader atomic.Value - client *clientv3.Client - rootPath string - leaderPath string - member participant - serviceName string + leader atomic.Value + client *clientv3.Client + rootPath string + member participant // memberValue is the serialized string of `member`. It will be saved in the // leader key when this participant is successfully elected as the leader of // the group. Every write will use it to check the leadership. @@ -70,15 +68,15 @@ type Participant struct { } // NewParticipant create a new Participant. -func NewParticipant(client *clientv3.Client, serviceName string) *Participant { +func NewParticipant(client *clientv3.Client, msParam keypath.MsParam) *Participant { return &Participant{ - client: client, - serviceName: serviceName, + MsParam: msParam, + client: client, } } -// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName) -func (m *Participant) InitInfo(p participant, rootPath string, leaderName string, purpose string) { +// InitInfo initializes the member info. +func (m *Participant) InitInfo(p participant, rootPath string, purpose string) { data, err := p.Marshal() if err != nil { // can't fail, so panic here. @@ -87,10 +85,9 @@ func (m *Participant) InitInfo(p participant, rootPath string, leaderName string m.member = p m.memberValue = string(data) m.rootPath = rootPath - m.leaderPath = path.Join(rootPath, leaderName) m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose) m.lastLeaderUpdatedTime.Store(time.Now()) - log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.leaderPath)) + log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.GetLeaderPath())) } // ID returns the unique ID for this participant in the election group @@ -143,7 +140,7 @@ func (m *Participant) GetLeaderID() uint64 { func (m *Participant) GetLeader() participant { leader := m.leader.Load() if leader == nil { - return NewParticipantByService(m.serviceName) + return NewParticipantByService(m.ServiceName) } return leader.(participant) } @@ -156,7 +153,7 @@ func (m *Participant) setLeader(member participant) { // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { - leader := NewParticipantByService(m.serviceName) + leader := NewParticipantByService(m.ServiceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -168,7 +165,7 @@ func (m *Participant) EnableLeader() { // GetLeaderPath returns the path of the leader. func (m *Participant) GetLeaderPath() string { - return m.leaderPath + return keypath.LeaderPath(&m.MsParam) } // GetLastLeaderUpdatedTime returns the last time when the leader is updated. @@ -207,7 +204,7 @@ func (*Participant) PreCheckLeader() error { // getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). func (m *Participant) getPersistentLeader() (participant, int64, error) { - leader := NewParticipantByService(m.serviceName) + leader := NewParticipantByService(m.ServiceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { return nil, 0, err @@ -281,12 +278,12 @@ func (m *Participant) IsSameLeader(leader participant) bool { // GetDCLocationPathPrefix returns the dc-location path prefix of the cluster. func (m *Participant) GetDCLocationPathPrefix() string { - return path.Join(m.rootPath, dcLocationConfigEtcdPrefix) + return keypath.Prefix(keypath.DCLocationPath(&m.MsParam, 0)) } // GetDCLocationPath returns the dc-location path of a member with the given member ID. func (m *Participant) GetDCLocationPath(id uint64) string { - return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id)) + return keypath.DCLocationPath(&m.MsParam, id) } func (m *Participant) campaignCheck() bool { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c49153814d..07731204e3d 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -751,13 +751,16 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) // Initialize the participant info to join the primary election. - participant := member.NewParticipant(kgm.etcdClient, constant.TSOServiceName) + participant := member.NewParticipant(kgm.etcdClient, keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: group.ID, + }) p := &tsopb.Participant{ Name: uniqueName, Id: uniqueID, // id is unique among all participants ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()}, } - participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), constant.PrimaryKey, "keyspace group primary election") + participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), "keyspace group primary election") // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 10934d1be9e..ba0e7111cf2 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -53,7 +53,6 @@ const ( keyspacePrefix = "keyspaces" keyspaceMetaInfix = "meta" keyspaceIDInfix = "id" - keyspaceAllocID = "alloc_id" gcSafePointInfix = "gc_safe_point" serviceSafePointInfix = "service_safe_point" regionPathPrefix = "raft/r" @@ -285,12 +284,6 @@ func KeyspaceIDPath(name string) string { return path.Join(keyspacePrefix, keyspaceIDInfix, name) } -// KeyspaceIDAlloc returns the path of the keyspace id's persistent window boundary. -// Path: keyspaces/alloc_id -func KeyspaceIDAlloc() string { - return path.Join(keyspacePrefix, keyspaceAllocID) -} - // EncodeKeyspaceID from uint32 to string. // It adds extra padding to make encoded ID ordered. // Encoded ID can be decoded directly with strconv.ParseUint. diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/key_path_v2.go new file mode 100644 index 00000000000..395e311e64c --- /dev/null +++ b/pkg/utils/keypath/key_path_v2.go @@ -0,0 +1,101 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import ( + "fmt" + "path" +) + +const ( + leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + dcLocationPathFormat = "/pd/%d/dc-location/%d" // "/pd/{cluster_id}/dc-location/{member_id}" + memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" + memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" + memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" + allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" + keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" + + msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" + msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" + msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" + msDCLocationPathFormat = "/ms/%d/%s/dc-location/%d" // "/ms/{cluster_id}/{service_name}/dc-location/{member_id}" + msTsoDefaultDCLocationPath = "/ms/%d/tso/00000/dc-location/%d" // "/ms/{cluster_id}/tso/00000/dc-location/{member_id}" + msTsoKespaceDCLocationPath = "/ms/%d/tso/keyspace_groups/election/%05d/dc-location/%d" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/dc-location/{member_id}" +) + +// MsParam is the parameter of micro service. +type MsParam struct { + ServiceName string + GroupID uint32 // only used for tso keyspace group +} + +// Prefix returns the parent directory of the given path. +func Prefix(str string) string { + return path.Dir(str) +} + +// LeaderPath returns the leader path. +func LeaderPath(p *MsParam) string { + if p == nil || p.ServiceName == "" { + return fmt.Sprintf(leaderPathFormat, ClusterID()) + } + if p.ServiceName == "tso" { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msTsoKespaceLeaderPathFormat, ClusterID(), p.GroupID) + } + return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) +} + +// DCLocationPath returns the dc-location path. +func DCLocationPath(p *MsParam, memberID uint64) string { + if p == nil || p.ServiceName == "" { + return fmt.Sprintf(dcLocationPathFormat, ClusterID(), memberID) + } + if p.ServiceName == "tso" { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultDCLocationPath, ClusterID(), memberID) + } + return fmt.Sprintf(msTsoKespaceDCLocationPath, ClusterID(), p.GroupID, memberID) + } + return fmt.Sprintf(msDCLocationPathFormat, ClusterID(), p.ServiceName, memberID) +} + +// MemberBinaryDeployPath returns the member binary deploy path. +func MemberBinaryDeployPath(id uint64) string { + return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) +} + +// MemberGitHashPath returns the member git hash path. +func MemberGitHashPath(id uint64) string { + return fmt.Sprintf(memberGitHashPath, ClusterID(), id) +} + +// MemberBinaryVersionPath returns the member binary version path. +func MemberBinaryVersionPath(id uint64) string { + return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) +} + +// AllocIDPath returns the alloc id path. +func AllocIDPath() string { + return fmt.Sprintf(allocIDPathFormat, ClusterID()) +} + +// KeyspaceAllocIDPath returns the keyspace alloc id path. +func KeyspaceAllocIDPath() string { + return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) +} diff --git a/server/server.go b/server/server.go index 029c85694c3..ed61b14834f 100644 --- a/server/server.go +++ b/server/server.go @@ -93,9 +93,6 @@ const ( // pdRootPath for all pd servers. pdRootPath = "/pd" pdAPIPrefix = "/pd/" - // idAllocPath for idAllocator to save persistent window's end. - idAllocPath = "alloc_id" - idAllocLabel = "idalloc" recoveringMarkPath = "cluster/markers/snapshot-recovering" @@ -446,11 +443,9 @@ func (s *Server) startServer(ctx context.Context) error { return err } s.idAllocator = id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - RootPath: s.rootPath, - AllocPath: idAllocPath, - Label: idAllocLabel, - Member: s.member.MemberValue(), + Client: s.client, + Label: id.DefaultLabel, + Member: s.member.MemberValue(), }) s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption) if err != nil { @@ -487,12 +482,10 @@ func (s *Server) startServer(ctx context.Context) error { s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ - Client: s.client, - RootPath: s.rootPath, - AllocPath: keypath.KeyspaceIDAlloc(), - Label: keyspace.AllocLabel, - Member: s.member.MemberValue(), - Step: keyspace.AllocStep, + Client: s.client, + Label: id.KeyspaceLabel, + Member: s.member.MemberValue(), + Step: keyspace.AllocStep, }) if s.IsAPIServiceMode() { s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) diff --git a/tests/integrations/realcluster/cluster.go b/tests/integrations/realcluster/cluster.go index 4dc80b5d433..feabbfb17e0 100644 --- a/tests/integrations/realcluster/cluster.go +++ b/tests/integrations/realcluster/cluster.go @@ -87,6 +87,9 @@ func (s *clusterSuite) stopCluster(t *testing.T) { } func (s *clusterSuite) tag() string { + if s.ms { + return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, s.clusterCnt) + } return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) } @@ -181,5 +184,6 @@ func waitTiupReady(t *testing.T, tag string) { zap.String("tag", tag), zap.Error(err)) time.Sleep(time.Duration(interval) * time.Second) } - require.Failf(t, "TiUP is not ready", "tag: %s", tag) + // this check can trigger the cleanup function + require.NotZero(t, 1, "TiUP is not ready", "tag: %s", tag) } diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index 59a56896693..8ca5dd1ce77 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -16,7 +16,6 @@ package realcluster import ( "context" - "os/exec" "strings" "testing" @@ -56,10 +55,8 @@ func (s *clusterIDSuite) TestClientClusterID() { } func getPDEndpoints(t *testing.T) []string { - cmd := exec.Command("sh", "-c", "ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") - bytes, err := cmd.Output() + pdAddrsForEachTikv, err := runCommandWithOutput("ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'") require.NoError(t, err) - pdAddrsForEachTikv := strings.Split(string(bytes), "\n") var pdAddrs []string for _, addr := range pdAddrsForEachTikv { // length of addr is less than 5 means it must not be a valid address diff --git a/tests/integrations/realcluster/download_integration_test_binaries.sh b/tests/integrations/realcluster/download_integration_test_binaries.sh index 8d4cc3411a8..9009750ca99 100644 --- a/tests/integrations/realcluster/download_integration_test_binaries.sh +++ b/tests/integrations/realcluster/download_integration_test_binaries.sh @@ -28,6 +28,8 @@ tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/ tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" +ETCD_VERSION=v3.5.15 + set -o nounset # See https://misc.flogisoft.com/bash/tip_colors_and_formatting. @@ -69,6 +71,12 @@ function main() { mv third_bin/tiflash third_bin/_tiflash mv third_bin/_tiflash/* third_bin && rm -rf third_bin/_tiflash + # etcdctl + curl -L https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-linux-amd64.tar.gz -o etcd-${ETCD_VERSION}-linux-amd64.tar.gz + tar -xvf etcd-${ETCD_VERSION}-linux-amd64.tar.gz + sudo mv etcd-${ETCD_VERSION}-linux-amd64/etcdctl /usr/local/bin/ + etcdctl version + chmod +x third_bin/* rm -rf tmp rm -rf third_bin/bin diff --git a/tests/integrations/realcluster/etcd_key_test.go b/tests/integrations/realcluster/etcd_key_test.go new file mode 100644 index 00000000000..09721a71a79 --- /dev/null +++ b/tests/integrations/realcluster/etcd_key_test.go @@ -0,0 +1,146 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "fmt" + "slices" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/utils/testutil" +) + +type etcdKeySuite struct { + clusterSuite +} + +func TestEtcdKey(t *testing.T) { + suite.Run(t, &etcdKeySuite{ + clusterSuite: clusterSuite{ + suiteName: "etcd_key", + }, + }) +} + +func TestMSEtcdKey(t *testing.T) { + suite.Run(t, &etcdKeySuite{ + clusterSuite: clusterSuite{ + suiteName: "etcd_key", + ms: true, + }, + }) +} + +var ( + // The keys that prefix is `/pd`. + pdKeys = []string{ + "", + "/pd//alloc_id", + "/pd//config", + // If not call `UpdateGCSafePoint`, this key will not exist. + // "/pd//gc/safe_point", + "/pd//gc/safe_point/service/gc_worker", + "/pd//keyspaces/id/DEFAULT", + "/pd//keyspaces/meta/", + "/pd//leader", + "/pd//member//binary_version", + "/pd//member//deploy_path", + "/pd//member//git_hash", + "/pd//raft", + "/pd//raft/min_resolved_ts", + "/pd//raft/r/", + "/pd//raft/s/", + "/pd//raft/status/raft_bootstrap_time", + "/pd//region_label/keyspaces/", + "/pd//rule_group/tiflash", + "/pd//rules/-c", + "/pd//scheduler_config/balance-hot-region-scheduler", + "/pd//scheduler_config/balance-leader-scheduler", + "/pd//scheduler_config/balance-region-scheduler", + "/pd//scheduler_config/evict-slow-store-scheduler", + "/pd//timestamp", + "/pd//tso/keyspace_groups/membership/", // ms + "/pd/cluster_id", + } + // The keys that prefix is `/ms`. + msKeys = []string{ + "", + "/ms//scheduling/primary", + "/ms//scheduling/primary/expected_primary", + "/ms//scheduling/registry/http://...:", + "/ms//tso//primary", + "/ms//tso//primary/expected_primary", + "/ms//tso/registry/http://...:", + } + // These keys with `/pd` are only in `ms` mode. + pdMSKeys = []string{ + "/pd//tso/keyspace_groups/membership/", + } +) + +func (s *etcdKeySuite) TestEtcdKey() { + var keysBackup []string + if !s.ms { + keysBackup = pdKeys + pdKeys = slices.DeleteFunc(pdKeys, func(s string) bool { + return slices.Contains(pdMSKeys, s) + }) + defer func() { + pdKeys = keysBackup + }() + } + t := s.T() + endpoints := getPDEndpoints(t) + + testutil.Eventually(require.New(t), func() bool { + keys, err := getEtcdKey(endpoints[0], "/pd") + if err != nil { + return false + } + return checkEtcdKey(t, keys, pdKeys) + }) + + if s.ms { + testutil.Eventually(require.New(t), func() bool { + keys, err := getEtcdKey(endpoints[0], "/ms") + if err != nil { + return false + } + return checkEtcdKey(t, keys, msKeys) + }) + } +} + +func getEtcdKey(endpoints, prefix string) ([]string, error) { + // `sed 's/[0-9]*//g'` is used to remove the number in the etcd key, such as the cluster id. + etcdCmd := fmt.Sprintf("etcdctl --endpoints=%s get %s --prefix --keys-only | sed 's/[0-9]*//g' | sort | uniq", + endpoints, prefix) + return runCommandWithOutput(etcdCmd) +} + +func checkEtcdKey(t *testing.T, keys, expectedKeys []string) bool { + for i, key := range keys { + if len(key) == 0 { + continue + } + if expectedKeys[i] != key { + t.Logf("expected key: %s, got key: %s", expectedKeys[i], key) + return false + } + } + return true +} diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index 789ceaa29c2..a33f9c071c6 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -17,6 +17,7 @@ package realcluster import ( "os" "os/exec" + "strings" "time" ) @@ -40,6 +41,16 @@ func runCommand(name string, args ...string) error { return cmd.Run() } +func runCommandWithOutput(cmdStr string) ([]string, error) { + cmd := exec.Command("sh", "-c", cmdStr) + bytes, err := cmd.Output() + if err != nil { + return nil, err + } + output := strings.Split(string(bytes), "\n") + return output, nil +} + func fileExists(path string) bool { _, err := os.Stat(path) return !os.IsNotExist(err)