Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: add disable to independent config #8567

Merged
merged 11 commits into from
Sep 18, 2024
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down
21 changes: 17 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand All @@ -52,8 +51,7 @@ const (
)

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
baseDefaultSchedulerConfig
Copy link
Member

@rleungx rleungx Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving Disabled here.

Copy link
Member Author

@okJiang okJiang Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this, we need implement isDisable and setDisable four times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

friendly ping~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like other config?


Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Expand Down Expand Up @@ -164,7 +162,7 @@ type balanceLeaderScheduler struct {
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
s := &balanceLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceLeaderHandler(conf),
Expand Down Expand Up @@ -541,3 +539,18 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}

// IsDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) IsDisable() bool {
return l.conf.isDisable()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to implement it for every scheduler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base_scheduler.go implemented it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is can we implement this common function only once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to implement it once, we need to put isDisable and setDisable into schedulerConfig interface. Do you think you need to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT? Is it possible that we might customize it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this pr, we only need to make a customized implementation for defaultScheduler. I think this is acceptable and only needs to be implemented four times.

If follow what you said, it does only need to be implemented once, but I feel that setDisable should not be placed in the schedulerConfig interface, because only defaultScheduler can be setDisabled. But if you insist, I can also modify it


// SetDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) SetDisable(disable bool) error {
return l.conf.setDisable(disable)
}

// IsDefault implements the Scheduler interface.
func (*balanceLeaderScheduler) IsDefault() bool {
return true
}
19 changes: 18 additions & 1 deletion pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
)

type balanceRegionSchedulerConfig struct {
baseDefaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}
Expand All @@ -48,7 +50,7 @@ type balanceRegionScheduler struct {
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
scheduler := &balanceRegionScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler, conf),
retryQuota: newRetryQuota(),
name: types.BalanceRegionScheduler.String(),
conf: conf,
Expand Down Expand Up @@ -266,3 +268,18 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
}
return nil
}

// IsDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) IsDisable() bool {
return s.conf.isDisable()
}

// SetDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) SetDisable(disable bool) error {
return s.conf.setDisable(disable)
}

// IsDefault implements the Scheduler interface.
func (*balanceRegionScheduler) IsDefault() bool {
return true
}
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type balanceWitnessScheduler struct {
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
s := &balanceWitnessScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceWitnessHandler(conf),
Expand Down
18 changes: 16 additions & 2 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *operator.Controller, tp types.CheckerSchedulerType) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp}
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf}
}

func (*BaseScheduler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -114,3 +119,12 @@
func (s *BaseScheduler) GetType() types.CheckerSchedulerType {
return s.tp
}

// IsDiable implements the Scheduler interface.
func (*BaseScheduler) IsDisable() bool { return false }

Check warning on line 124 in pkg/schedule/schedulers/base_scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/base_scheduler.go#L124

Added line #L124 was not covered by tests

// SetDisable implements the Scheduler interface.
func (*BaseScheduler) SetDisable(bool) error { return nil }

// IsDefault returns if the scheduler is a default scheduler.
func (*BaseScheduler) IsDefault() bool { return false }
Copy link
Contributor

@nolouch nolouch Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

type hasDefaulSchedulerConfig interface {
  func IsDisable() bool
  func SetDisable(disabled bool) error
}

func (s *BaseScheduler) IsDefault() bool { 
    _, ok := s.conf.(hasDefaulSchedulerConfig)
    return ok
}

func (s *BaseScheduler) SetDisable(v bool) error {
  if c, ok := s.conf.(hasDefaulSchedulerConfig);ok {
     c.SetDisable(v)
  }
  return nil
}

func (*BaseScheduler) IsDisable() bool { 
    if c, ok := s.conf.(hasDefaulSchedulerConfig);ok {
          return c.IsDisable()
    }
     return false 
}

then no need to rewrite the interface for default scheduler.

38 changes: 37 additions & 1 deletion pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ package schedulers
import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type schedulerConfig interface {
init(name string, storage endpoint.ConfigStorage, data any)
save() error
load(any) error
init(name string, storage endpoint.ConfigStorage, data any)
}

var _ schedulerConfig = &baseSchedulerConfig{}

type baseSchedulerConfig struct {
name string
storage endpoint.ConfigStorage
Expand Down Expand Up @@ -58,3 +64,33 @@ func (b *baseSchedulerConfig) load(v any) error {
}
return DecodeConfig([]byte(data), v)
}

type baseDefaultSchedulerConfig struct {
schedulerConfig
syncutil.RWMutex

Disabled bool `json:"disabled"`
}

func newBaseDefaultSchedulerConfig() baseDefaultSchedulerConfig {
return baseDefaultSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
}

func (b *baseDefaultSchedulerConfig) isDisable() bool {
b.Lock()
defer b.Unlock()
if err := b.load(b); err != nil {
log.Warn("failed to load scheduler config, maybe the config never persist", errs.ZapError(err))
}
return b.Disabled
}

func (b *baseDefaultSchedulerConfig) setDisable(disabled bool) error {
b.Lock()
defer b.Unlock()
b.Disabled = disabled
log.Info("set scheduler disable", zap.Bool("disabled", disabled))
return b.save()
}
34 changes: 34 additions & 0 deletions pkg/schedule/schedulers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,37 @@ func TestSchedulerConfig(t *testing.T) {
// report error because the config is empty and cannot be decoded
require.Error(t, cfg2.load(newTc))
}

func TestDefaultSchedulerConfig(t *testing.T) {
s := storage.NewStorageWithMemoryBackend()

type testConfig struct {
balanceLeaderSchedulerConfig
Value string `json:"value"`
}

cfg := &testConfig{
balanceLeaderSchedulerConfig: balanceLeaderSchedulerConfig{
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
},
Value: "test",
}
cfg.init("test", s, cfg)
require.False(t, cfg.isDisable())
require.NoError(t, cfg.setDisable(true))
require.True(t, cfg.isDisable())

cfg2 := &testConfig{
balanceLeaderSchedulerConfig: balanceLeaderSchedulerConfig{
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
},
}
cfg2.init("test", s, cfg2)
require.True(t, cfg2.isDisable())
require.Equal(t, "", cfg2.Value)

cfg3 := &testConfig{}
require.NoError(t, cfg2.load(cfg3))
require.Equal(t, cfg.Value, cfg3.Value)
require.True(t, cfg3.Disabled)
}
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ type evictLeaderScheduler struct {
func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) Scheduler {
handler := newEvictLeaderHandler(conf)
return &evictLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.EvictLeaderScheduler, conf),
conf: conf,
handler: handler,
}
Expand Down
29 changes: 21 additions & 8 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand All @@ -39,8 +38,7 @@ const (
)

type evictSlowStoreSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
baseDefaultSchedulerConfig

cluster *core.BasicCluster
// Last timestamp of the chosen slow store for eviction.
Expand All @@ -52,10 +50,10 @@ type evictSlowStoreSchedulerConfig struct {

func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
baseDefaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
}
}

Expand Down Expand Up @@ -310,12 +308,27 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
return s.schedulerEvictLeader(cluster), nil
}

// IsDiable implements the Scheduler interface.
func (s *evictSlowStoreScheduler) IsDisable() bool {
return s.conf.isDisable()
}

// SetDisable implements the Scheduler interface.
func (s *evictSlowStoreScheduler) SetDisable(disable bool) error {
return s.conf.setDisable(disable)
}

// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores.
func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler {
handler := newEvictSlowStoreHandler(conf)
return &evictSlowStoreScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowStoreScheduler),
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowStoreScheduler, conf),
conf: conf,
handler: handler,
}
}

// IsDefault implements the Scheduler interface.
func (*evictSlowStoreScheduler) IsDefault() bool {
return true
}
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler {
handler := newEvictSlowTrendHandler(conf)
sche := &evictSlowTrendScheduler{
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowTrendScheduler),
BaseScheduler: NewBaseScheduler(opController, types.EvictSlowTrendScheduler, conf),
conf: conf,
handler: handler,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ type grantHotRegionScheduler struct {

// newGrantHotRegionScheduler creates an admin scheduler that transfers hot region peer to fixed store and hot region leader to one store.
func newGrantHotRegionScheduler(opController *operator.Controller, conf *grantHotRegionSchedulerConfig) *grantHotRegionScheduler {
base := newBaseHotScheduler(opController,
statistics.DefaultHistorySampleDuration, statistics.DefaultHistorySampleInterval)
base := newBaseHotScheduler(opController, statistics.DefaultHistorySampleDuration,
statistics.DefaultHistorySampleInterval, conf)
base.tp = types.GrantHotRegionScheduler
handler := newGrantHotRegionHandler(conf)
ret := &grantHotRegionScheduler{
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type grantLeaderScheduler struct {
// newGrantLeaderScheduler creates an admin scheduler that transfers all leaders
// to a store.
func newGrantLeaderScheduler(opController *operator.Controller, conf *grantLeaderSchedulerConfig) Scheduler {
base := NewBaseScheduler(opController, types.GrantLeaderScheduler)
base := NewBaseScheduler(opController, types.GrantLeaderScheduler, conf)
handler := newGrantLeaderHandler(conf)
return &grantLeaderScheduler{
BaseScheduler: base,
Expand Down
27 changes: 23 additions & 4 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ type baseHotScheduler struct {
updateWriteTime time.Time
}

func newBaseHotScheduler(opController *operator.Controller, sampleDuration time.Duration, sampleInterval time.Duration) *baseHotScheduler {
base := NewBaseScheduler(opController, types.BalanceHotRegionScheduler)
func newBaseHotScheduler(
opController *operator.Controller,
sampleDuration, sampleInterval time.Duration,
schedulerConfig schedulerConfig,
) *baseHotScheduler {
base := NewBaseScheduler(opController, types.BalanceHotRegionScheduler, schedulerConfig)
ret := &baseHotScheduler{
BaseScheduler: base,
regionPendings: make(map[uint64]*pendingInfluence),
Expand Down Expand Up @@ -197,8 +201,8 @@ type hotScheduler struct {
}

func newHotScheduler(opController *operator.Controller, conf *hotRegionSchedulerConfig) *hotScheduler {
base := newBaseHotScheduler(opController,
conf.getHistorySampleDuration(), conf.getHistorySampleInterval())
base := newBaseHotScheduler(opController, conf.getHistorySampleDuration(),
conf.getHistorySampleInterval(), conf)
ret := &hotScheduler{
baseHotScheduler: base,
conf: conf,
Expand Down Expand Up @@ -280,6 +284,16 @@ func (h *hotScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*opera
return h.dispatch(typ, cluster), nil
}

// IsDisable implements the Scheduler interface.
func (h *hotScheduler) IsDisable() bool {
return h.conf.isDisable()
}

// SetDisable implements the Scheduler interface.
func (h *hotScheduler) SetDisable(disable bool) error {
return h.conf.setDisable(disable)
}

func (h *hotScheduler) dispatch(typ resourceType, cluster sche.SchedulerCluster) []*operator.Operator {
h.Lock()
defer h.Unlock()
Expand Down Expand Up @@ -386,6 +400,11 @@ func (h *hotScheduler) balanceHotWriteLeaders(cluster sche.SchedulerCluster) []*
return nil
}

// IsDefault implements the Scheduler interface.
func (*hotScheduler) IsDefault() bool {
return true
}

type solution struct {
srcStore *statistics.StoreLoadDetail
region *core.RegionInfo // The region of the main balance effect. Relate mainPeerStat. srcStore -> dstStore
Expand Down
Loading