Skip to content

Commit

Permalink
Merge branch 'master' into add_integration_test
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Nov 14, 2023
2 parents 52e9a4c + 86831ce commit 3e0f203
Show file tree
Hide file tree
Showing 12 changed files with 793 additions and 695 deletions.
5 changes: 3 additions & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
kvs := resp.GetKvs()
if len(kvs) == 0 {
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
err = json.Unmarshal(kvs[0].GetValue(), config)
if err != nil {
return nil, err
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,4 @@ func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRe
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
if !isPrepared && isNew {
c.GetCoordinator().GetPrepareChecker().Collect(region)
}
}
18 changes: 15 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,11 +1340,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore.
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.notFromStorageRegionsCnt
return r.tree.notFromStorageRegionsCount()
}

// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
r.st.RLock()
defer r.st.RUnlock()
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
}

// GetMetaRegions gets a set of metapb.Region from regionMap
Expand Down Expand Up @@ -1380,7 +1392,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {
return r.getStoreRegionCountLocked(storeID)
}

// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID
// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID
func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int {
return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (t *regionTree) length() int {
return t.tree.Len()
}

func (t *regionTree) notFromStorageRegionsCount() int {
if t == nil {
return 0
}
return t.notFromStorageRegionsCnt
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
35 changes: 6 additions & 29 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
start: time.Now(),
}
}

Expand All @@ -51,13 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -66,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
}
log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}

func (checker *prepareChecker) Collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
Expand All @@ -95,10 +79,3 @@ func (checker *prepareChecker) SetPrepared() {
defer checker.Unlock()
checker.prepared = true
}

// for test purpose
func (checker *prepareChecker) GetSum() int {
checker.RLock()
defer checker.RUnlock()
return checker.sum
}
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
18 changes: 8 additions & 10 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2384,7 +2384,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
peer, _ := c.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
return c.putRegion(core.NewRegionInfo(region, nil))
return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage)))
}

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -2469,7 +2469,7 @@ func TestDispatch(t *testing.T) {

func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error {
co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream)
if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil {
if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil {
return err
}
co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil)
Expand Down Expand Up @@ -2943,14 +2943,14 @@ func TestShouldRun(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(7, co.GetPrepareChecker().GetSum())
re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt())
}

func TestShouldRunWithNonLeaderRegions(t *testing.T) {
Expand Down Expand Up @@ -2986,14 +2986,14 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) {

for _, testCase := range testCases {
r := tc.GetRegion(testCase.regionID)
nr := r.Clone(core.WithLeader(r.GetPeers()[0]))
nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat))
re.NoError(tc.processRegionHeartbeat(nr))
re.Equal(testCase.ShouldRun, co.ShouldRun())
}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat))
re.Error(tc.processRegionHeartbeat(newRegion))
re.Equal(9, co.GetPrepareChecker().GetSum())
re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt())

// Now, after server is prepared, there exist some regions with no leader.
re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId())
Expand Down Expand Up @@ -3263,7 +3263,6 @@ func TestRestart(t *testing.T) {
re.NoError(tc.addRegionStore(3, 3))
re.NoError(tc.addLeaderRegion(1, 1))
region := tc.GetRegion(1)
co.GetPrepareChecker().Collect(region)

// Add 1 replica on store 2.
stream := mockhbstream.NewHeartbeatStream()
Expand All @@ -3277,7 +3276,6 @@ func TestRestart(t *testing.T) {

// Recreate coordinator then add another replica on store 3.
co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams)
co.GetPrepareChecker().Collect(region)
co.Run()
re.NoError(dispatchHeartbeat(co, region, stream))
region = waitAddLearner(re, stream, region, 3)
Expand Down
Loading

0 comments on commit 3e0f203

Please sign in to comment.