Skip to content

Commit

Permalink
controller: fix limiter cannot work well in high concurrency scenario (
Browse files Browse the repository at this point in the history
…#8436) (#8437)

close #8435

controller: Fix limiter not functioning well in high concurrency scenarios
- In high concurrency scenarios, time may appear rollback because the `now` value passed from outside. high mutext completion leading to more non-sequential execution orders.
- Time rollback allows advancing more tokens, which can cause the issue. even result in no limit for the controller.
- Fix the problem by avoiding time rollback; instead of acquiring time again within the lock to fix it, as this might incur high costs when frequently acquiring time.

Signed-off-by: nolouch <[email protected]>

Co-authored-by: nolouch <[email protected]>
  • Loading branch information
ti-chi-bot and nolouch authored Jul 24, 2024
1 parent 6974ec5 commit fcb34c9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 4 deletions.
16 changes: 12 additions & 4 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand All @@ -351,7 +351,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
opt(lim)
}
lim.maybeNotify()
logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
logControllerTrace("[resource group controller] after reconfigure", zap.String("name", lim.name), zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

// AvailableTokens decreases the amount of tokens currently available.
Expand All @@ -362,6 +362,14 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 {
return tokens
}

func (lim *Limiter) updateLast(t time.Time) {
// make sure lim.last is monotonic
// see issue: https://github.com/tikv/pd/issues/8435.
if lim.last.Before(t) {
lim.last = t
}
}

const reserveWarnLogInterval = 10 * time.Millisecond

// reserveN is a helper method for Reserve.
Expand Down Expand Up @@ -406,7 +414,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
}
// Update state
if ok {
lim.last = now
lim.updateLast(now)
lim.tokens = tokens
lim.maybeNotify()
} else {
Expand All @@ -424,7 +432,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
zap.String("name", lim.name))
}
lim.last = last
lim.updateLast(last)
if lim.limit == 0 {
lim.notify()
} else if lim.remainingNotifyTimes > 0 {
Expand Down
79 changes: 79 additions & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package controller

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -212,3 +214,80 @@ func TestCancelErrorOfReservation(t *testing.T) {
re.Error(err)
re.Contains(err.Error(), "context canceled")
}

func TestQPS(t *testing.T) {
re := require.New(t)
cases := []struct {
concurrency int
reserveN int64
ruPerSec int64
}{
{1000, 10, 400000},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.ruPerSec), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.ruPerSec)
t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime))
re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(100)*float64(tc.reserveN))
re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(100))
})
}
}

const testCaseRunTime = 4 * time.Second

func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) {
nc := make(chan notifyMsg, 1)
lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
var totalRequests int64
start := time.Now()

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN))
if r.OK() {
delay := r.DelayFrom(time.Now())
<-time.After(delay)
} else {
panic("r not ok")
}
atomic.AddInt64(&totalRequests, 1)
}
}()
}
var vQPS atomic.Value
var wait time.Duration
ch := make(chan struct{})
go func() {
var windowRequests int64
for {
elapsed := time.Since(start)
if elapsed >= testCaseRunTime {
close(ch)
break
}
windowRequests = atomic.SwapInt64(&totalRequests, 0)
vQPS.Store(float64(windowRequests))
r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN))
wait = r.Delay()
time.Sleep(1 * time.Second)
}
}()
<-ch
cancel()
wg.Wait()
qps = vQPS.Load().(float64)
return qps, qps * float64(reserveN), wait
}

0 comments on commit fcb34c9

Please sign in to comment.