Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd/path: change leaderPath/dcLocationPath/memberPath/allocalIDPath to absolute path #8789

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
61 changes: 32 additions & 29 deletions pkg/id/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
package id

import (
"path"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type label string

const (
// DefaultLabel is the default label for id allocator.
DefaultLabel label = "idalloc"
// KeyspaceLabel is the label for keyspace id allocator.
KeyspaceLabel label = "keyspace-idAlloc"
)

// Allocator is the allocator to generate unique ID.
type Allocator interface {
// SetBase set base id
Expand All @@ -48,13 +56,11 @@ type allocatorImpl struct {
base uint64
end uint64

client *clientv3.Client
rootPath string
allocPath string
label string
member string
step uint64
metrics *metrics
client *clientv3.Client
label label
member string
step uint64
metrics *metrics
}

// metrics is a collection of idAllocator's metrics.
Expand All @@ -64,24 +70,20 @@ type metrics struct {

// AllocatorParams are parameters needed to create a new ID Allocator.
type AllocatorParams struct {
Client *clientv3.Client
RootPath string
AllocPath string // AllocPath specifies path to the persistent window boundary.
Label string // Label used to label metrics and logs.
Member string // Member value, used to check if current pd leader.
Step uint64 // Step size of each persistent window boundary increment, default 1000.
Client *clientv3.Client
Label label // Label used to label metrics and logs.
Member string // Member value, used to check if current pd leader.
Step uint64 // Step size of each persistent window boundary increment, default 1000.
}

// NewAllocator creates a new ID Allocator.
func NewAllocator(params *AllocatorParams) Allocator {
allocator := &allocatorImpl{
client: params.Client,
rootPath: params.RootPath,
allocPath: params.AllocPath,
label: params.Label,
member: params.Member,
step: params.Step,
metrics: &metrics{idGauge: idGauge.WithLabelValues(params.Label)},
client: params.Client,
label: params.Label,
member: params.Member,
step: params.Step,
metrics: &metrics{idGauge: idGauge.WithLabelValues(string(params.Label))},
}
if allocator.step == 0 {
allocator.step = defaultAllocStep
Expand Down Expand Up @@ -127,9 +129,14 @@ func (alloc *allocatorImpl) Rebase() error {
}

func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error {
key := alloc.getAllocIDPath()
var key string
if alloc.label == KeyspaceLabel {
key = keypath.KeyspaceAllocIDPath()
} else {
key = keypath.AllocIDPath()
}

leaderPath := path.Join(alloc.rootPath, "leader")
leaderPath := keypath.LeaderPath(nil)
var (
cmps = []clientv3.Cmp{clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member)}
end uint64
Expand Down Expand Up @@ -173,10 +180,6 @@ func (alloc *allocatorImpl) rebaseLocked(checkCurrEnd bool) error {
// please do not reorder the first field, it's need when getting the new-end
// see: https://docs.pingcap.com/tidb/dev/pd-recover#get-allocated-id-from-pd-log
log.Info("idAllocator allocates a new id", zap.Uint64("new-end", end), zap.Uint64("new-base", alloc.base),
zap.String("label", alloc.label), zap.Bool("check-curr-end", checkCurrEnd))
zap.String("label", string(alloc.label)), zap.Bool("check-curr-end", checkCurrEnd))
return nil
}

func (alloc *allocatorImpl) getAllocIDPath() string {
return path.Join(alloc.rootPath, alloc.allocPath)
}
25 changes: 11 additions & 14 deletions pkg/id/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package id

import (
"context"
"strconv"
"sync"
"testing"

Expand All @@ -25,10 +24,7 @@ import (
)

const (
rootPath = "/pd"
leaderPath = "/pd/leader"
allocPath = "alloc_id"
label = "idalloc"
leaderPath = "/pd/0/leader"
memberVal = "member"
step = uint64(500)
)
Expand All @@ -44,24 +40,25 @@ func TestMultipleAllocator(t *testing.T) {
_, err := client.Put(context.Background(), leaderPath, memberVal)
re.NoError(err)

var i uint64
wg := sync.WaitGroup{}
for i := range 3 {
iStr := strconv.Itoa(i)
fn := func(label label) {
wg.Add(1)
// All allocators share rootPath and memberVal, but they have different allocPaths, labels and steps.
// Different allocators have different labels and steps.
allocator := NewAllocator(&AllocatorParams{
Client: client,
RootPath: rootPath,
AllocPath: allocPath + iStr,
Label: label + iStr,
Member: memberVal,
Step: step * uint64(i), // allocator 0, 1, 2 should have step size 1000 (default), 500, 1000 respectively.
Client: client,
Label: label,
Member: memberVal,
Step: step * i, // allocator 0, 1 should have step size 1000 (default), 500 respectively.
})
go func(re *require.Assertions, allocator Allocator) {
defer wg.Done()
testAllocator(re, allocator)
}(re, allocator)
i++
}
fn(DefaultLabel)
fn(KeyspaceLabel)
wg.Wait()
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// AllocStep set idAllocator's step when write persistent window boundary.
// Use a lower value for denser idAllocation in the event of frequent pd leader change.
AllocStep = uint64(100)
// AllocLabel is used to label keyspace idAllocator's metrics.
AllocLabel = "keyspace-idAlloc"
// regionLabelIDPrefix is used to prefix the keyspace region label.
regionLabelIDPrefix = "keyspaces/"
// regionLabelKey is the key for keyspace id in keyspace region label.
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,15 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.GetClient(), constant.SchedulingServiceName)
s.participant = member.NewParticipant(s.GetClient(), keypath.MsParam{
ServiceName: constant.SchedulingServiceName,
})
p := &schedulingpb.Participant{
Name: uniqueName,
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), constant.PrimaryKey, "primary election")
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
40 changes: 14 additions & 26 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
)

const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
moveLeaderTimeout = 5 * time.Second
// If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 3
)
Expand Down Expand Up @@ -160,8 +160,8 @@ func (m *EmbeddedEtcdMember) EnableLeader() {
}

// GetLeaderPath returns the path of the PD leader.
func (m *EmbeddedEtcdMember) GetLeaderPath() string {
return path.Join(m.rootPath, "leader")
func (*EmbeddedEtcdMember) GetLeaderPath() string {
return keypath.LeaderPath(nil)
}

// GetLeadership returns the leadership of the PD member.
Expand Down Expand Up @@ -384,13 +384,13 @@ func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string {
}

// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
func (m *EmbeddedEtcdMember) GetDCLocationPathPrefix() string {
return path.Join(m.rootPath, dcLocationConfigEtcdPrefix)
func (*EmbeddedEtcdMember) GetDCLocationPathPrefix() string {
return keypath.Prefix(keypath.DCLocationPath(nil, 0))
}

// GetDCLocationPath returns the dc-location path of a member with the given member ID.
func (m *EmbeddedEtcdMember) GetDCLocationPath(id uint64) string {
return path.Join(m.GetDCLocationPathPrefix(), fmt.Sprint(id))
func (*EmbeddedEtcdMember) GetDCLocationPath(id uint64) string {
return keypath.DCLocationPath(nil, id)
}

// SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader.
Expand Down Expand Up @@ -452,13 +452,9 @@ func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) {
return int(priority), nil
}

func (m *EmbeddedEtcdMember) getMemberBinaryDeployPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/deploy_path", id))
}

// GetMemberDeployPath loads a member's binary deploy path.
func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) {
key := m.getMemberBinaryDeployPath(id)
key := keypath.MemberBinaryDeployPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -471,7 +467,7 @@ func (m *EmbeddedEtcdMember) GetMemberDeployPath(id uint64) (string, error) {

// SetMemberDeployPath saves a member's binary deploy path.
func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error {
key := m.getMemberBinaryDeployPath(id)
key := keypath.MemberBinaryDeployPath(id)
txn := kv.NewSlowLogTxn(m.client)
execPath, err := os.Executable()
deployPath := filepath.Dir(execPath)
Expand All @@ -488,17 +484,9 @@ func (m *EmbeddedEtcdMember) SetMemberDeployPath(id uint64) error {
return nil
}

func (m *EmbeddedEtcdMember) getMemberGitHashPath(id uint64) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a test to check if the changes produce the same results as before? For example, we could output the path from the old version as a dataset for the new code test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All I can think of is to verify the consistency of the paths through manual testing. There doesn't seem to be any good testing methods to automate🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use fixed testdata from your manual testing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying

return path.Join(m.rootPath, fmt.Sprintf("member/%d/git_hash", id))
}

func (m *EmbeddedEtcdMember) getMemberBinaryVersionPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/binary_version", id))
}

// GetMemberBinaryVersion loads a member's binary version.
func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) {
key := m.getMemberBinaryVersionPath(id)
key := keypath.MemberBinaryVersionPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -511,7 +499,7 @@ func (m *EmbeddedEtcdMember) GetMemberBinaryVersion(id uint64) (string, error) {

// GetMemberGitHash loads a member's git hash.
func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) {
key := m.getMemberGitHashPath(id)
key := keypath.MemberGitHashPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return "", err
Expand All @@ -524,7 +512,7 @@ func (m *EmbeddedEtcdMember) GetMemberGitHash(id uint64) (string, error) {

// SetMemberBinaryVersion saves a member's binary version.
func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion string) error {
key := m.getMemberBinaryVersionPath(id)
key := keypath.MemberBinaryVersionPath(id)
txn := kv.NewSlowLogTxn(m.client)
res, err := txn.Then(clientv3.OpPut(key, releaseVersion)).Commit()
if err != nil {
Expand All @@ -538,7 +526,7 @@ func (m *EmbeddedEtcdMember) SetMemberBinaryVersion(id uint64, releaseVersion st

// SetMemberGitHash saves a member's git hash.
func (m *EmbeddedEtcdMember) SetMemberGitHash(id uint64, gitHash string) error {
key := m.getMemberGitHashPath(id)
key := keypath.MemberGitHashPath(id)
txn := kv.NewSlowLogTxn(m.client)
res, err := txn.Then(clientv3.OpPut(key, gitHash)).Commit()
if err != nil {
Expand Down
Loading