Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: consider leader score when evict leader #8912

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
if region.GetLeader().GetId() != peer.GetId() && rf.Rule.Role == placement.Leader {
ruleCheckerFixLeaderRoleCounter.Inc()
if c.allowLeader(fit, peer) {
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), 0)
}
ruleCheckerNotAllowLeaderCounter.Inc()
return nil, errPeerCannotBeLeader
Expand All @@ -329,7 +329,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
ruleCheckerFixFollowerRoleCounter.Inc()
for _, p := range region.GetPeers() {
if c.allowLeader(fit, p) {
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), 0)
}
}
ruleCheckerNoNewLeaderCounter.Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type SchedulerConfigProvider interface {
IsTraceRegionFlow() bool

GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy

IsDebugMetricsEnabled() bool
IsDiagnosticAllowed() bool
Expand Down Expand Up @@ -112,6 +111,7 @@ type SharedConfigProvider interface {
IsCrossTableMergeEnabled() bool
IsOneWayMergeEnabled() bool
GetMergeScheduleLimit() uint64
GetLeaderSchedulePolicy() constant.SchedulePolicy
GetRegionScoreFormulaVersion() string
GetSchedulerMaxWaitingOperator() uint64
GetStoreLimitByType(uint64, storelimit.Type) float64
Expand Down
18 changes: 18 additions & 0 deletions pkg/schedule/filter/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer {
}
}

// LeaderScoreComparer creates a StoreComparer to sort store by leader
// score.
func LeaderScoreComparer(conf config.SchedulerConfigProvider) StoreComparer {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
leaderSchedulePolicy := conf.GetLeaderSchedulePolicy()
return func(a, b *core.StoreInfo) int {
sa := a.LeaderScore(leaderSchedulePolicy, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leader score is not very accurate, so it leads to the leader count of the lowest score goes up too much. How about considering the the running operators influence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sb := b.LeaderScore(leaderSchedulePolicy, 0)
switch {
case sa > sb:
return 1
case sa < sb:
return -1
default:
return 0
}
}
}

// IsolationComparer creates a StoreComparer to sort store by isolation score.
func IsolationComparer(locationLabels []string, regionStores []*core.StoreInfo) StoreComparer {
return func(a, b *core.StoreInfo) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err
return errors.Errorf("region has no voter in store %v", storeID)
}

op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
return err
Expand Down
19 changes: 0 additions & 19 deletions pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,25 +296,6 @@ func (b *Builder) SetLeader(storeID uint64) *Builder {
return b
}

// SetLeaders records all valid target leaders in Builder.
func (b *Builder) SetLeaders(storeIDs []uint64) *Builder {
if b.err != nil {
return b
}
sort.Slice(storeIDs, func(i, j int) bool { return storeIDs[i] < storeIDs[j] })
for _, storeID := range storeIDs {
peer := b.targetPeers[storeID]
if peer == nil || core.IsLearner(peer) || b.unhealthyPeers[storeID] != nil {
continue
}
b.targetLeaderStoreIDs = append(b.targetLeaderStoreIDs, storeID)
}
// Don't need to check if there's valid target, because `targetLeaderStoreIDs`
// can be empty if this is not a multi-target evict leader operation. Besides,
// `targetLeaderStoreID` must be valid and there must be at least one valid target.
return b
}

// SetPeers resets the target peer list.
//
// If peer's ID is 0, the builder will allocate a new ID later. If current
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, r
}

// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
SetLeaders(targetStoreIDs).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need targetStoreIDs, which is used in evict_leader.go before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an optimization, why we don't need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetLeaders checks learner and unhealthy peer. https://github.com/tikv/pd/blob/master/pkg%2Fschedule%2Foperator%2Fbuilder.go#L301-L301

SetLeader also check these. https://github.com/tikv/pd/blob/master/pkg%2Fschedule%2Foperator%2Fbuilder.go#L284-L284

SetLeaders sort target stores according to store id. This PR sort target stores according to score.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

targetLeaderStoreIDs is used previously, but removed by this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use it in evict leader scheduler previously to select targets. It is replaced with this pr.

Copy link
Member

@rleungx rleungx Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allowed multiple targets in the op step before, this PR changes it which might be slower?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only used in evict leader, after this pr there is no other scheduler using it. So I remove it.
If we will use it in the future, I will recover it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was O(n) previously and it is O(nlogn) now.
Considering that the number of sorts is usually two (three replicas) or four (five replicas), the difference is not too big.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build(kind)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (suite *createOperatorTestSuite) TestCreateTransferLeaderOperator() {
}
for _, testCase := range testCases {
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: testCase.originPeers}, testCase.originPeers[0])
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, []uint64{}, 0)
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, 0)

if testCase.isErr {
re.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
}
solver.Step++
defer func() { solver.Step-- }()
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
if collector != nil {
Expand Down
29 changes: 19 additions & 10 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,12 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {

if len(candidates.Stores) == 0 {
evictLeaderNoTargetStoreCounter.Inc()
continue
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
}
op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader)
op, err := createOperatorWithSort(name, cluster, candidates, region)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand All @@ -385,6 +378,22 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
return ops
}

func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming it to CreateTransferLeaderOperatorToLowestScoreStore and moving it to operator.go?

// we will pick low leader score store firstly.
candidates.Sort(filter.RegionScoreComparer(cluster.GetSharedConfig()))
var (
op *operator.Operator
err error
)
for _, target := range candidates.Stores {
op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader)
if op != nil && err == nil {
return op, err
}
}
return op, err
}

type evictLeaderHandler struct {
rd *render.Render
config *evictLeaderSchedulerConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region
dstStore := &metapb.Peer{StoreId: destStoreIDs[i]}

if isLeader {
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, []uint64{}, operator.OpLeader)
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, operator.OpLeader)
} else {
op, err = operator.CreateMovePeerOperator(s.GetName()+"-move", cluster, srcRegion, operator.OpRegion|operator.OpLeader, srcStore.GetID(), dstStore)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,6 @@ func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dst
bs,
region,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
case movePeer:
op, err = operator.CreateMovePeerOperator("move-peer-test", tc, region, operator.OpAdmin, 2, &metapb.Peer{Id: region.GetID()*10000 + 1, StoreId: 4})
case transferLeader:
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, []uint64{}, operator.OpAdmin)
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, operator.OpAdmin)
}
re.NoError(err)
re.NotNil(op)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*ope
continue
}

op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create transfer label reject leader operator", errs.ZapError(err))
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool)
shuffleLeaderNoFollowerCounter.Inc()
return nil, nil
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create shuffle leader operator", errs.ZapError(err))
return nil, nil
Expand Down
21 changes: 12 additions & 9 deletions pkg/schedule/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,22 @@ func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.Sched
filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores),
&filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {
if len(candidates.Stores) == 0 {
transferWitnessLeaderNoTargetStoreCounter.Inc()
return nil, errors.New("no target store to schedule")
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
// TODO: also add sort such as evict leader
var (
op *operator.Operator
err error
)
for _, target := range candidates.Stores {
op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader)
if op != nil && err == nil {
return op, err
}
}
return operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpWitnessLeader)
return op, err
}

// RecvRegionInfo receives a checked region from coordinator
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) (
if target == nil {
continue
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand Down
Loading