Skip to content

Commit

Permalink
Merge branch 'tikv:master' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
JackL9u authored Nov 8, 2024
2 parents 03b1e54 + c9e532c commit b9c721d
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 18 deletions.
7 changes: 6 additions & 1 deletion server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques
return
}
case types.GrantLeaderScheduler, types.EvictLeaderScheduler:
storeID, ok := input["store_id"].(float64)
_, ok := input["store_id"]
if !ok {
h.r.JSON(w, http.StatusBadRequest, "missing store id")
return
}
storeID, ok := input["store_id"].(float64)
if !ok {
h.r.JSON(w, http.StatusBadRequest, "please input a right store id")
return
}
var (
exist bool
err error
Expand Down
10 changes: 9 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,14 @@ func (c *RaftCluster) runServiceCheckJob() {
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
// ensure raft cluster is running
// avoid unexpected startTSOJobsIfNeeded when raft cluster is stopping
// ref: https://github.com/tikv/pd/issues/8781
c.RLock()
if c.running {
c.checkTSOService()
}
c.RUnlock()
}
}
}
Expand Down Expand Up @@ -488,6 +495,7 @@ func (c *RaftCluster) stopTSOJobsIfNeeded() error {
return err
}
if allocator.IsInitialize() {
log.Info("closing the global TSO allocator")
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
Expand Down
20 changes: 17 additions & 3 deletions tests/server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,24 @@ func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) {

input := make(map[string]any)
input["name"] = "evict-leader-scheduler"
input["store_id"] = 1
body, err := json.Marshal(input)
re.NoError(err)
re.NoError(tu.CheckPostJSON(tests.TestDialClient, urlPrefix, body, tu.StatusOK(re)))
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(tests.TestDialClient, urlPrefix, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "missing store id")),
)
input["store_id"] = "abc" // bad case
body, err = json.Marshal(input)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(tests.TestDialClient, urlPrefix, body,
tu.Status(re, http.StatusBadRequest),
tu.StringEqual(re, "please input a right store id")),
)

input["store_id"] = 1
body, err = json.Marshal(input)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(tests.TestDialClient, urlPrefix, body, tu.StatusOK(re)))

suite.assertSchedulerExists(urlPrefix, "evict-leader-scheduler")
resp := make(map[string]any)
Expand Down
16 changes: 11 additions & 5 deletions tools/pd-ctl/pdctl/command/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,20 @@ func setServiceMiddlewareCommandFunc(cmd *cobra.Command, args []string) {
input := map[string]any{
"label": args[1],
}
value, err := strconv.ParseUint(args[3], 10, 64)
if err != nil {
cmd.Println(err)
return
}

if strings.ToLower(args[2]) == "qps" {
value, err := strconv.ParseFloat(args[3], 64)
if err != nil {
cmd.Println(err)
return
}
input["qps"] = value
} else if strings.ToLower(args[2]) == "concurrency" {
value, err := strconv.ParseUint(args[3], 10, 64)
if err != nil {
cmd.Println(err)
return
}
input["concurrency"] = value
} else {
cmd.Println("Input is invalid, should be qps or concurrency")
Expand Down
16 changes: 8 additions & 8 deletions tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,21 +1005,21 @@ func TestServiceMiddlewareConfig(t *testing.T) {
re.NoError(err)
conf.AuditConfig.EnableAudit = false
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "qps", "100")
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "qps", "100.1")
re.NoError(err)
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100, QPSBurst: 100}
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100.1, QPSBurst: 100}
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "grpc-rate-limit", "GetRegion", "qps", "101")
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "grpc-rate-limit", "GetRegion", "qps", "101.1")
re.NoError(err)
conf.GRPCRateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 101, QPSBurst: 101}
conf.GRPCRateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 101.1, QPSBurst: 101}
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "concurrency", "10")
re.NoError(err)
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100, QPSBurst: 100, ConcurrencyLimit: 10}
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100.1, QPSBurst: 100, ConcurrencyLimit: 10}
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "grpc-rate-limit", "GetRegion", "concurrency", "11")
re.NoError(err)
conf.GRPCRateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 101, QPSBurst: 101, ConcurrencyLimit: 11}
conf.GRPCRateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 101.1, QPSBurst: 101, ConcurrencyLimit: 11}
check()
output, err := tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "xxx", "GetRegion", "qps", "1000")
re.NoError(err)
Expand All @@ -1032,14 +1032,14 @@ func TestServiceMiddlewareConfig(t *testing.T) {
re.Contains(string(output), "Input is invalid")
output, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "grpc-rate-limit", "GetRegion", "qps", "xxx")
re.NoError(err)
re.Contains(string(output), "strconv.ParseUint")
re.Contains(string(output), "strconv.ParseFloat")
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "grpc-rate-limit", "enable-grpc-rate-limit", "false")
re.NoError(err)
conf.GRPCRateLimitConfig.EnableRateLimit = false
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "concurrency", "0")
re.NoError(err)
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100, QPSBurst: 100}
conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100.1, QPSBurst: 100}
check()
_, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "qps", "0")
re.NoError(err)
Expand Down

0 comments on commit b9c721d

Please sign in to comment.