Skip to content

Commit

Permalink
add grpc rate limit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 18, 2024
1 parent 71f6f96 commit 07243f3
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
7 changes: 5 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,8 +1414,11 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque
} else if rsp != nil {
return rsp.(*pdpb.GetRegionResponse), nil
}
var rc *cluster.RaftCluster
var region *core.RegionInfo
failpoint.Inject("delayProcess", nil)
var (
rc *cluster.RaftCluster
region *core.RegionInfo
)
if *followerHandle {
rc = s.cluster
if !rc.GetRegionSyncer().IsRunning() {
Expand Down
120 changes: 120 additions & 0 deletions tests/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ package server_test

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -127,3 +132,118 @@ func TestLeader(t *testing.T) {
return cluster.GetLeader() != leader1
})
}

func TestGRPCRateLimit(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)

leader := cluster.WaitLeader()
re.NotEmpty(leader)
leaderServer := cluster.GetServer(leader)
clusterID := leaderServer.GetClusterID()
addr := leaderServer.GetAddr()
grpcPDClient := testutil.MustNewGrpcClient(re, addr)
leaderServer.BootstrapCluster()
for i := 0; i < 100; i++ {
resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: clusterID},
RegionKey: []byte(""),
})
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}

// test rate limit
urlPrefix := fmt.Sprintf("%s/pd/api/v1/service-middleware/config/grpc-rate-limit", addr)
input := make(map[string]interface{})
input["label"] = "GetRegion"
input["qps"] = 1
jsonBody, err := json.Marshal(input)
re.NoError(err)
err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody,
testutil.StatusOK(re), testutil.StringContain(re, "QPS rate limiter is changed"))
re.NoError(err)
for i := 0; i < 2; i++ {
resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()},
RegionKey: []byte(""),
})
re.NoError(err)
if i == 0 {
re.Empty(resp.GetHeader().GetError())
} else {
re.Contains(resp.GetHeader().GetError().GetMessage(), "rate limit exceeded")
}
}

input["label"] = "GetRegion"
input["qps"] = 0
jsonBody, err = json.Marshal(input)
re.NoError(err)
err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody,
testutil.StatusOK(re), testutil.StringContain(re, "QPS rate limiter is deleted"))
re.NoError(err)
for i := 0; i < 100; i++ {
resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()},
RegionKey: []byte(""),
})
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}

// test concurrency limit
input["concurrency"] = 1
jsonBody, err = json.Marshal(input)
re.NoError(err)
var (
okCh = make(chan struct{})
errCh = make(chan string)
)
err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody,
testutil.StatusOK(re), testutil.StringContain(re, "Concurrency limiter is changed"))
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayProcess", `pause`))
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()},
RegionKey: []byte(""),
})
re.NoError(err)
if resp.GetHeader().GetError() != nil {
errCh <- resp.GetHeader().GetError().GetMessage()
} else {
okCh <- struct{}{}
}
}()

grpcPDClient1 := testutil.MustNewGrpcClient(re, addr)
go func() {
defer wg.Done()
resp, err := grpcPDClient1.GetRegion(context.Background(), &pdpb.GetRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()},
RegionKey: []byte(""),
})
re.NoError(err)
if resp.GetHeader().GetError() != nil {
errCh <- resp.GetHeader().GetError().GetMessage()
} else {
okCh <- struct{}{}
}
}()
errStr := <-errCh
re.Contains(errStr, "rate limit exceeded")
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayProcess"))
<-okCh
wg.Wait()
}

0 comments on commit 07243f3

Please sign in to comment.