Skip to content

Commit

Permalink
scheduler: make hot v2 more suitable small hot region (#6827)
Browse files Browse the repository at this point in the history
close #6645

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Jul 31, 2023
1 parent 85d3a0c commit 16926ad
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 47 deletions.
101 changes: 58 additions & 43 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
)

var (
topnPosition = 10
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
Expand Down Expand Up @@ -449,11 +450,13 @@ func isAvailableV1(s *solution) bool {

type balanceSolver struct {
sche.SchedulerCluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
rwTy statistics.RWType
opTy opType
resourceTy resourceType
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered)
nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers
rwTy statistics.RWType
opTy opType
resourceTy resourceType

cur *solution

Expand Down Expand Up @@ -492,8 +495,21 @@ type balanceSolver struct {
}

func (bs *balanceSolver) init() {
// Init store load detail according to the type.
// Load the configuration items of the scheduler.
bs.resourceTy = toResourceType(bs.rwTy, bs.opTy)
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()
switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}

// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
Expand All @@ -506,10 +522,14 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min())
bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max())
maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current)
bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen)
bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail)
}

rankStepRatios := []float64{
Expand All @@ -524,19 +544,6 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}
}

func (bs *balanceSolver) initRankV1() {
Expand Down Expand Up @@ -660,7 +667,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
Expand Down Expand Up @@ -688,7 +695,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.needSearchRevertRegions() {
hotSchedulerSearchRevertRegionsCounter.Inc()
dstStoreID := dstStore.GetID()
for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) {
for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] {
revertRegion := bs.getRegion(revertPeerStat, dstStoreID)
if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() ||
!allowRevertRegion(revertRegion, srcStoreID) {
Expand Down Expand Up @@ -869,44 +876,52 @@ func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, exp

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat {
hotPeers := storeLoad.HotPeers
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(secondSort, hotPeers)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) >= topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return ret
}

return
for _, peer := range hotPeers {
appendItem(peer)
}
return ret
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
Expand Down
11 changes: 7 additions & 4 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,20 +1907,23 @@ func TestHotCacheSortHotPeer(t *testing.T) {
},
}}

st := &statistics.StoreLoadDetail{
HotPeers: hotPeers,
}
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
u := leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1}, u)

leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
u = leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1, 2}, u)
}

func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) {
func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) {
re.Equal(len(hotPeers), len(regions))
for _, region := range regions {
in := false
for hotPeer := range hotPeers {
for _, hotPeer := range hotPeers {
if hotPeer.RegionID == region {
in = true
break
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,17 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim)
peersRate := bs.cur.getPeersRateFromCache(dim)
highRate, lowRate := srcRate, dstRate
topnHotPeer := bs.nthHotPeer[bs.cur.srcStore.GetID()][dim]
reverse := false
if srcRate < dstRate {
highRate, lowRate = dstRate, srcRate
peersRate = -peersRate
reverse = true
topnHotPeer = bs.nthHotPeer[bs.cur.dstStore.GetID()][dim]
}
topnRate := math.MaxFloat64
if topnHotPeer != nil {
topnRate = topnHotPeer.GetLoad(dim)
}

if highRate*rs.balancedCheckRatio <= lowRate {
Expand Down Expand Up @@ -262,6 +268,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
// maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced.
minNotWorsenedRate = -bs.getMinRate(dim)
minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio)
minBetterRate = math.Min(minBetterRate, topnRate)
maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio
maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio
}
Expand Down
125 changes: 125 additions & 0 deletions pkg/schedule/schedulers/hot_region_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/docker/go-units"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
Expand All @@ -32,6 +33,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
statistics.Denoising = false
statisticsInterval = 0

sche, err := CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
Expand Down Expand Up @@ -145,6 +147,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) {
// This is a test that searchRevertRegions finds a solution of rank -2.
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
// This is a test that searchRevertRegions finds a solution of rank -1.
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -268,6 +272,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
func TestSkipUniformStore(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0

cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -345,3 +350,123 @@ func TestSkipUniformStore(t *testing.T) {
operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 3, 2)
clearPendingInfluence(hb.(*hotScheduler))
}

func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) {
// This is a test that we can schedule small hot region,
// which is smaller than 20% of diff or 2% of low node. (#6645)
// 20% is from `firstPriorityPerceivedRatio`, 2% is from `firstPriorityMinHotRatio`.
// The byte of high node is 2000MB/s, the low node is 200MB/s.
// The query of high node is 2000qps, the low node is 200qps.
// There are all small hot regions in the cluster, which are smaller than 20% of diff or 2% of low node.
re := require.New(t)
emptyFunc := func(*mockcluster.Cluster, *hotScheduler) {}
highLoad, lowLoad := uint64(2000), uint64(200)
bigHotRegionByte := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * units.MiB * statistics.ReadReportInterval)
bigHotRegionQuery := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * statistics.ReadReportInterval)

// Case1: Before #6827, we only use minHotRatio, so cannot schedule small hot region in this case.
// Because 10000 is larger than the length of hotRegions, so `filterHotPeers` will skip the topn calculation.
origin := topnPosition
topnPosition = 10000
ops := checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc)
re.Empty(ops)
topnPosition = origin

// Case2: After #6827, we use top10 as the threshold of minHotPeer.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc)
re.Len(ops, 1)
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, lowLoad, highLoad, emptyFunc)
re.Len(ops, 0)

// Case3: If there is larger hot region, we will schedule it.
hotRegionID := uint64(100)
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 1)
re.Equal(hotRegionID, ops[0].RegionID())

// Case4: If there is larger hot region, but it need to cool down, we will schedule small hot region.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 1)
re.NotEqual(hotRegionID, ops[0].RegionID())

// Case5: If there is larger hot region, but it is pending, we will schedule small hot region.
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID] = &pendingInfluence{}
})
re.Len(ops, 1)
re.NotEqual(hotRegionID, ops[0].RegionID())

// Case5: If there are more than topnPosition hot regions, but them need to cool down,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
// just transfer leader
tc.AddRegionWithReadInfo(hotRegionID+1, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
})
re.Len(ops, 0)
topnPosition = origin

// Case6: If there are more than topnPosition hot regions, but them are pending,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) {
tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID] = &pendingInfluence{}
tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3})
hb.regionPendings[hotRegionID+1] = &pendingInfluence{}
})
re.Len(ops, 0)
topnPosition = origin
}

func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLoad, lowLoad uint64,
addOtherRegions func(*mockcluster.Cluster, *hotScheduler)) []*operator.Operator {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
statistics.Denoising = false
sche, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.conf.SetSrcToleranceRatio(1)
hb.conf.SetDstToleranceRatio(1)
hb.conf.SetRankFormulaVersion("v2")
hb.conf.ReadPriorities = []string{statistics.QueryPriority, statistics.BytePriority}
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 40)
tc.AddRegionStore(2, 10)
tc.AddRegionStore(3, 10)

tc.UpdateStorageReadQuery(1, highLoad*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadQuery(2, lowLoad*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadQuery(3, (highLoad+lowLoad)/2*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(1, highLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0)
tc.UpdateStorageReadStats(2, lowLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0)
tc.UpdateStorageReadStats(3, (highLoad+lowLoad)/2*units.MiB*statistics.StoreHeartBeatReportInterval, 0)

smallHotPeerQuery := float64(lowLoad) * firstPriorityMinHotRatio * 0.9 // it's a small hot region than the firstPriorityMinHotRatio
smallHotPeerByte := float64(lowLoad) * secondPriorityMinHotRatio * 0.9 * units.MiB // it's a small hot region than the secondPriorityMinHotRatio
regions := make([]testRegionInfo, 0)
for i := 10; i < 50; i++ {
regions = append(regions, testRegionInfo{uint64(i), []uint64{1, 2, 3}, smallHotPeerByte, 0, smallHotPeerQuery})
if i < 20 {
regions = append(regions, testRegionInfo{uint64(i), []uint64{2, 1, 3}, smallHotPeerByte, 0, smallHotPeerQuery})
regions = append(regions, testRegionInfo{uint64(i), []uint64{3, 1, 2}, smallHotPeerByte, 0, smallHotPeerQuery})
}
}
addRegionInfo(tc, statistics.Read, regions)
tc.SetHotRegionCacheHitsThreshold(1)
addOtherRegions(tc, hb)
ops, _ := hb.Schedule(tc, false)
return ops
}

0 comments on commit 16926ad

Please sign in to comment.