From 0452c632c71c5d5d7252f23eebc132bfcc478405 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 27 Oct 2023 19:12:13 +0800 Subject: [PATCH 01/14] mcs: support region label http interface in scheduling server Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 160 ++++++++++++++++++ pkg/schedule/handler/handler.go | 19 +++ pkg/utils/apiutil/serverapi/middleware.go | 23 ++- pkg/utils/testutil/api_check.go | 2 +- server/api/region_label.go | 2 +- server/api/server.go | 16 ++ tests/integrations/mcs/scheduling/api_test.go | 23 +++ 7 files changed, 236 insertions(+), 9 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39be00ef9a0..ed998b9b62d 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -17,6 +17,7 @@ package apis import ( "fmt" "net/http" + "net/url" "strconv" "sync" @@ -26,6 +27,7 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" @@ -114,6 +116,8 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterSchedulersRouter() s.RegisterCheckersRouter() s.RegisterHotspotRouter() + s.RegisterRegionsRouter() + s.RegisterRegionLabelRouter() return s } @@ -160,6 +164,21 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// RegisterRegionsRouter registers the router of the regions handler. +func (s *Service) RegisterRegionsRouter() { + router := s.root.Group("regions") + router.GET("/:id/label/:key", getRegionLabelByKey) + router.GET("/:id/labels", getRegionLabels) +} + +// RegisterRegionLabelRouter registers the router of the region label handler. +func (s *Service) RegisterRegionLabelRouter() { + router := s.root.Group("config/region-label") + router.GET("rules", getAllRegionLabelRules) + router.GET("rules/ids", getRegionLabelRulesByIDs) + router.GET("rule/:id", getRegionLabelRuleByID) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -548,3 +567,144 @@ func getHistoryHotRegions(c *gin.Context) { var res storage.HistoryHotRegions c.IndentedJSON(http.StatusOK, res) } + +// @Tags region_label +// @Summary Get label of a region. +// @Param id path integer true "Region Id" +// @Param key path string true "Label key" +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Router /regions/{id}/label/{key} [get] +func getRegionLabelByKey(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + idStr := c.Param("id") + labelKey := c.Param("key") // TODO: test https://github.com/tikv/pd/pull/4004 + + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + region := handler.GetRegion(id) + if region == nil { + c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error()) + return + } + + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + labelValue := l.GetRegionLabel(region, labelKey) + c.IndentedJSON(http.StatusOK, labelValue) +} + +// @Tags region_label +// @Summary Get labels of a region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Router /regions/{id}/labels [get] +func getRegionLabels(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + idStr := c.Param("id") + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + region := handler.GetRegion(id) + if region == nil { + c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error()) + return + } + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + labels := l.GetRegionLabels(region) + c.IndentedJSON(http.StatusOK, labels) +} + +// @Tags region_label +// @Summary List all label rules of cluster. +// @Produce json +// @Success 200 {array} labeler.LabelRule +// @Router /config/region-label/rules [get] +func getAllRegionLabelRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rules := l.GetAllLabelRules() + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags region_label +// @Summary Get label rules of cluster by ids. +// @Param body body []string true "IDs of query rules" +// @Produce json +// @Success 200 {array} labeler.LabelRule +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/region-label/rules/ids [get] +func getRegionLabelRulesByIDs(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + var ids []string + if err := c.BindJSON(&ids); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rules, err := l.GetLabelRules(ids) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags region_label +// @Summary Get label rule of cluster by id. +// @Param id path string true "Rule Id" +// @Produce json +// @Success 200 {object} labeler.LabelRule +// @Failure 404 {string} string "The rule does not exist." +// @Router /config/region-label/rule/{id} [get] +func getRegionLabelRuleByID(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + id, err := url.PathUnescape(c.Param("id")) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rule := l.GetLabelRule(id) + if rule == nil { + c.String(http.StatusNotFound, errs.ErrRegionRuleNotFound.FastGenByArgs().Error()) + return + } + c.IndentedJSON(http.StatusOK, rule) +} diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index fca43f3eeeb..c0cee81d27e 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" @@ -1040,3 +1041,21 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error) } return ret, nil } + +// GetRegion returns the region labeler. +func (h *Handler) GetRegion(id uint64) *core.RegionInfo { + c := h.GetCluster() + if c == nil { + return nil + } + return c.GetRegion(id) +} + +// GetRegionLabeler returns the region labeler. +func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) { + c := h.GetCluster() + if c == nil || c.GetRegionLabeler() == nil { + return nil, errs.ErrNotBootstrapped + } + return c.GetRegionLabeler(), nil +} diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 19438ad0f91..061c65329c9 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -79,6 +79,7 @@ type microserviceRedirectRule struct { targetPath string targetServiceName string matchMethods []string + filter func(*http.Request) bool } // NewRedirector redirects request to the leader if needs to be handled in the leader. @@ -94,14 +95,19 @@ func NewRedirector(s *server.Server, opts ...RedirectorOption) negroni.Handler { type RedirectorOption func(*redirector) // MicroserviceRedirectRule new a microservice redirect rule option -func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption { +func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, + methods []string, filters ...func(*http.Request) bool) RedirectorOption { return func(s *redirector) { - s.microserviceRedirectRules = append(s.microserviceRedirectRules, µserviceRedirectRule{ - matchPath, - targetPath, - targetServiceName, - methods, - }) + rule := µserviceRedirectRule{ + matchPath: matchPath, + targetPath: targetPath, + targetServiceName: targetServiceName, + matchMethods: methods, + } + if len(filters) > 0 { + rule.filter = filters[0] + } + s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule) } } @@ -117,6 +123,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { + if rule.filter != nil && !rule.filter(r) { + continue + } addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index 84af97f828d..786530b1567 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -88,7 +88,7 @@ func ReadGetJSON(re *require.Assertions, client *http.Client, url string, data i } // ReadGetJSONWithBody is used to do get request with input and check whether given data can be extracted successfully. -func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}) error { +func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}, checkOpts ...func([]byte, int, http.Header)) error { resp, err := apiutil.GetJSON(client, url, input) if err != nil { return err diff --git a/server/api/region_label.go b/server/api/region_label.go index 003dfb1132f..7958bacd371 100644 --- a/server/api/region_label.go +++ b/server/api/region_label.go @@ -83,7 +83,7 @@ func (h *regionLabelHandler) PatchRegionLabelRules(w http.ResponseWriter, r *htt // @Success 200 {array} labeler.LabelRule // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." -// @Router /config/region-label/rule/ids [get] +// @Router /config/region-label/rules/ids [get] func (h *regionLabelHandler) GetRegionLabelRulesByIDs(w http.ResponseWriter, r *http.Request) { cluster := getCluster(r) var ids []string diff --git a/server/api/server.go b/server/api/server.go index ee301ea54c8..992cd42d796 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -17,6 +17,7 @@ package api import ( "context" "net/http" + "strings" "github.com/gorilla/mux" scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" @@ -78,6 +79,21 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/checkers", mcs.SchedulingServiceName, []string{http.MethodPost, http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/region/id", + scheapi.APIPathPrefix+"/regions", + mcs.SchedulingServiceName, + []string{http.MethodGet}, + func(r *http.Request) bool { + // The original code uses the path "/region/id" to get the region id. + // However, the path "/region/id" is used to get the region by id, which is not what we want. + return strings.Contains(r.URL.Path, "label") + }), + serverapi.MicroserviceRedirectRule( + prefix+"/config/region-label", + scheapi.APIPathPrefix+"/config/region-label", + mcs.SchedulingServiceName, + []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/hotspot", scheapi.APIPathPrefix+"/hotspot", diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 5284913813c..3873ca0f96d 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/suite" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/schedule/handler" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/apiutil" @@ -217,4 +218,26 @@ func (suite *apiTestSuite) TestAPIForward() { err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/history"), &history, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) + + // Test region label + var rules []*labeler.LabelRule + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSONWithBody(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules/ids"), []byte(`["rule1", "rule3"]`), + &rules, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rule/rule1"), nil, + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1"), nil, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + re.NoError(err) } From 37c3c766b8427f8a1245193c8049e6cc2ca68492 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 1 Nov 2023 21:55:35 +0800 Subject: [PATCH 02/14] mcs: support region http interface in scheduling server Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 199 ++++++++++++++ pkg/schedule/handler/handler.go | 207 ++++++++++++++ server/api/region.go | 252 +++++------------- server/api/region_test.go | 10 +- server/api/server.go | 27 +- tests/integrations/mcs/scheduling/api_test.go | 29 +- 6 files changed, 527 insertions(+), 197 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index ed998b9b62d..9ec9145ba28 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,10 +15,12 @@ package apis import ( + "errors" "fmt" "net/http" "net/url" "strconv" + "strings" "sync" "github.com/gin-contrib/cors" @@ -38,6 +40,7 @@ import ( "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/unrolled/render" ) @@ -169,6 +172,11 @@ func (s *Service) RegisterRegionsRouter() { router := s.root.Group("regions") router.GET("/:id/label/:key", getRegionLabelByKey) router.GET("/:id/labels", getRegionLabels) + router.POST("/accelerate-schedule", accelerateRegionsScheduleInRange) + router.POST("/accelerate-schedule/batch", accelerateRegionsScheduleInRanges) + router.POST("/scatter", scatterRegions) + router.POST("/split", splitRegions) + router.GET("/replicated", checkRegionsReplicated) } // RegisterRegionLabelRouter registers the router of the region label handler. @@ -708,3 +716,194 @@ func getRegionLabelRuleByID(c *gin.Context) { } c.IndentedJSON(http.StatusOK, rule) } + +// @Tags region +// @Summary Accelerate regions scheduling a in given range, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in a given range [startKey, endKey)" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule [post] +func accelerateRegionsScheduleInRange(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + if !ok1 || !ok2 { + c.String(http.StatusBadRequest, "start_key or end_key is not string") + return + } + + limitStr, _ := c.GetQuery("limit") + limit, err := handler.AdjustLimit(limitStr, 256 /*default limit*/) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + err = handler.AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey, limit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.String(http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) +} + +// @Tags region +// @Summary Accelerate regions scheduling in given ranges, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in given ranges [startKey1, endKey1), [startKey2, endKey2), ..." +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule/batch [post] +func accelerateRegionsScheduleInRanges(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input []map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + limitStr, _ := c.GetQuery("limit") + limit, err := handler.AdjustLimit(limitStr, 256 /*default limit*/) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + var msgBuilder strings.Builder + msgBuilder.Grow(128) + msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") + var startKeys, endKeys [][]byte + for _, rg := range input { + startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + endKey, rawEndKey, err := apiutil.ParseKey("end_key", rg) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) + msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) + } + err = handler.AccelerateRegionsScheduleInRanges(startKeys, endKeys, limit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.String(http.StatusOK, msgBuilder.String()) +} + +// @Tags region +// @Summary Scatter regions by given key ranges or regions id distributed by given group with given retry limit +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Scatter regions by given key ranges or regions id distributed by given group with given retry limit" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/scatter [post] +func scatterRegions(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + group, _ := input["group"].(string) + retryLimit := 5 + if rl, ok := input["retry_limit"].(float64); ok { + retryLimit = int(rl) + } + + opsCount, failures, err := func() (int, map[uint64]error, error) { + if ok1 && ok2 { + return handler.ScatterRegionsByRange(rawStartKey, rawEndKey, group, retryLimit) + } + ids, ok := typeutil.JSONToUint64Slice(input["regions_id"]) + if !ok { + return 0, nil, errors.New("regions_id is invalid") + } + return handler.ScatterRegionsByID(ids, group, retryLimit, false) + }() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + s := handler.BuildScatterRegionsResp(opsCount, failures) + c.IndentedJSON(http.StatusOK, &s) +} + +// @Tags region +// @Summary Split regions with given split keys +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Split regions with given split keys" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/split [post] +func splitRegions(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + s, ok := input["split_keys"] + if !ok { + c.String(http.StatusBadRequest, "split_keys should be provided.") + return + } + rawSplitKeys := s.([]string) + if len(rawSplitKeys) < 1 { + c.String(http.StatusBadRequest, "empty split keys.") + return + } + fmt.Println(rawSplitKeys) + retryLimit := 5 + if rl, ok := input["retry_limit"].(float64); ok { + retryLimit = int(rl) + } + s, err := handler.SplitRegions(c.Request.Context(), rawSplitKeys, retryLimit) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, &s) +} + +// @Tags region +// @Summary Check if regions in the given key ranges are replicated. Returns 'REPLICATED', 'INPROGRESS', or 'PENDING'. 'PENDING' means that there is at least one region pending for scheduling. Similarly, 'INPROGRESS' means there is at least one region in scheduling. +// @Param startKey query string true "Regions start key, hex encoded" +// @Param endKey query string true "Regions end key, hex encoded" +// @Produce plain +// @Success 200 {string} string "INPROGRESS" +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/replicated [get] +func checkRegionsReplicated(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + rawStartKey, _ := c.GetQuery("start_key") + rawEndKey, _ := c.GetQuery("end_key") + state, err := handler.CheckRegionsReplicated(rawStartKey, rawEndKey) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + c.String(http.StatusOK, state) +} diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index c0cee81d27e..c8fb9e38a9f 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -16,12 +16,15 @@ package handler import ( "bytes" + "context" "encoding/hex" "net/http" + "strconv" "strings" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -42,6 +45,11 @@ import ( "go.uber.org/zap" ) +const ( + defaultRegionLimit = 16 + maxRegionLimit = 10240 +) + // Server is the interface for handler about schedule. // TODO: remove it after GetCluster is unified between PD server and Scheduling server. type Server interface { @@ -1059,3 +1067,202 @@ func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) { } return c.GetRegionLabeler(), nil } + +// AccelerateRegionsScheduleInRange accelerates regions scheduling in a given range. +func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string, limit int) error { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return err + } + c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, limit) + if len(regions) > 0 { + regionsIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionsIDList = append(regionsIDList, region.GetID()) + } + co.GetCheckerController().AddSuspectRegions(regionsIDList...) + } + return nil +} + +// AccelerateRegionsScheduleInRanges accelerates regions scheduling in given ranges. +func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys [][]byte, limit int) error { + c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if len(startKeys) != len(endKeys) { + return errors.New("startKeys and endKeys should have the same length") + } + var regions []*core.RegionInfo + for i := range startKeys { + regions = append(regions, c.ScanRegions(startKeys[i], endKeys[i], limit)...) + } + if len(regions) > 0 { + regionsIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionsIDList = append(regionsIDList, region.GetID()) + } + co.GetCheckerController().AddSuspectRegions(regionsIDList...) + } + return nil +} + +// AdjustLimit adjusts the limit of regions to schedule. +func (h *Handler) AdjustLimit(limitStr string, defaultLimits ...int) (int, error) { + limit := defaultRegionLimit + if len(defaultLimits) > 0 { + limit = defaultLimits[0] + } + if limitStr != "" { + var err error + limit, err = strconv.Atoi(limitStr) + if err != nil { + return 0, err + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + return limit, nil +} + +// ScatterRegionsResponse is the response for scatter regions. +type ScatterRegionsResponse struct { + ProcessedPercentage int `json:"processed-percentage"` +} + +// BuildScatterRegionsResp builds ScatterRegionsResponse. +func (h *Handler) BuildScatterRegionsResp(opsCount int, failures map[uint64]error) *ScatterRegionsResponse { + // If there existed any operator failed to be added into Operator Controller, add its regions into unProcessedRegions + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + log.Debug("scatter regions", zap.Errors("failures", func() []error { + r := make([]error, 0, len(failures)) + for _, err := range failures { + r = append(r, err) + } + return r + }())) + } + return &ScatterRegionsResponse{ + ProcessedPercentage: percentage, + } +} + +// ScatterRegionsByRange scatters regions by range. +func (h *Handler) ScatterRegionsByRange(rawStartKey, rawEndKey string, group string, retryLimit int) (int, map[uint64]error, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return 0, nil, err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return 0, nil, err + } + co := h.GetCoordinator() + if co == nil { + return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) +} + +// ScatterRegionsByID scatters regions by id. +func (h *Handler) ScatterRegionsByID(ids []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) { + co := h.GetCoordinator() + if co == nil { + return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) +} + +// SplitRegionsResponse is the response for split regions. +type SplitRegionsResponse struct { + ProcessedPercentage int `json:"processed-percentage"` + NewRegionsID []uint64 `json:"regions-id"` +} + +// SplitRegions splits regions by split keys. +func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []string, retryLimit int) (*SplitRegionsResponse, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + splitKeys := make([][]byte, 0, len(rawSplitKeys)) + for _, rawKey := range rawSplitKeys { + key, err := hex.DecodeString(rawKey) + if err != nil { + return nil, err + } + splitKeys = append(splitKeys, key) + } + + percentage, newRegionsID := co.GetRegionSplitter().SplitRegions(ctx, splitKeys, retryLimit) + s := &SplitRegionsResponse{ + ProcessedPercentage: percentage, + NewRegionsID: newRegionsID, + } + failpoint.Inject("splitResponses", func(val failpoint.Value) { + rawID, ok := val.(int) + if ok { + s.ProcessedPercentage = 100 + s.NewRegionsID = []uint64{uint64(rawID)} + } + }) + return s, nil +} + +// CheckRegionsReplicated checks if regions are replicated. +func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return "", err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return "", err + } + c := h.GetCluster() + if c == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, -1) + state := "REPLICATED" + for _, region := range regions { + if !filter.IsRegionReplicated(c, region) { + state = "INPROGRESS" + if co.IsPendingRegion(region.GetID()) { + state = "PENDING" + break + } + } + } + failpoint.Inject("mockPending", func(val failpoint.Value) { + aok, ok := val.(bool) + if ok && aok { + state = "PENDING" + } + }) + return state, nil +} diff --git a/server/api/region.go b/server/api/region.go index 68e280f610c..a80e3d0d48a 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -27,21 +27,18 @@ import ( "github.com/gorilla/mux" jwriter "github.com/mailru/easyjson/jwriter" - "github.com/pingcap/failpoint" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" - "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/unrolled/render" - "go.uber.org/zap" ) // MetaPeer is api compatible with *metapb.Peer. @@ -301,51 +298,28 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { // @Failure 400 {string} string "The input is invalid." // @Router /regions/replicated [get] func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) - vars := mux.Vars(r) - startKeyHex := vars["startKey"] - startKey, err := hex.DecodeString(startKeyHex) + rawStartKey := vars["startKey"] + rawEndKey := vars["endKey"] + state, err := h.Handler.CheckRegionsReplicated(rawStartKey, rawEndKey) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - endKeyHex := vars["endKey"] - endKey, err := hex.DecodeString(endKeyHex) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - - regions := rc.ScanRegions(startKey, endKey, -1) - state := "REPLICATED" - for _, region := range regions { - if !filter.IsRegionReplicated(rc, region) { - state = "INPROGRESS" - if rc.GetCoordinator().IsPendingRegion(region.GetID()) { - state = "PENDING" - break - } - } - } - failpoint.Inject("mockPending", func(val failpoint.Value) { - aok, ok := val.(bool) - if ok && aok { - state = "PENDING" - } - }) h.rd.JSON(w, http.StatusOK, state) } type regionsHandler struct { + *server.Handler svr *server.Server rd *render.Render } func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler { return ®ionsHandler{ - svr: svr, - rd: rd, + Handler: svr.GetHandler(), + svr: svr, + rd: rd, } } @@ -422,19 +396,12 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) startKey := r.URL.Query().Get("key") endKey := r.URL.Query().Get("end_key") - - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } + regions := rc.ScanRegions([]byte(startKey), []byte(endKey), limit) b, err := marshalRegionsInfoJSON(r.Context(), regions) if err != nil { @@ -509,16 +476,10 @@ func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Reque return } - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } regionBound := keyspace.MakeRegionBound(keyspaceID) regions := rc.ScanRegions(regionBound.RawLeftBound, regionBound.RawRightBound, limit) @@ -789,8 +750,6 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques } const ( - defaultRegionLimit = 16 - maxRegionLimit = 10240 minRegionHistogramSize = 1 minRegionHistogramKeys = 1000 ) @@ -892,43 +851,27 @@ func (h *regionsHandler) GetTopCPURegions(w http.ResponseWriter, r *http.Request // @Failure 400 {string} string "The input is invalid." // @Router /regions/accelerate-schedule [post] func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - startKey, rawStartKey, err := apiutil.ParseKey("start_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + if !ok1 || !ok2 { + h.rd.JSON(w, http.StatusBadRequest, "start_key or end_key is not string") return } - endKey, rawEndKey, err := apiutil.ParseKey("end_key", input) + limit, err := h.AdjustLimit(r.URL.Query().Get("limit"), 256 /*default limit*/) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - limit := 256 - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit - } - - regions := rc.ScanRegions(startKey, endKey, limit) - if len(regions) > 0 { - regionsIDList := make([]uint64, 0, len(regions)) - for _, region := range regions { - regionsIDList = append(regionsIDList, region.GetID()) - } - rc.AddSuspectRegions(regionsIDList...) + err = h.Handler.AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey, limit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.Text(w, http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) } @@ -943,27 +886,20 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, // @Failure 400 {string} string "The input is invalid." // @Router /regions/accelerate-schedule/batch [post] func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input []map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - limit := 256 - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit"), 256 /*default limit*/) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } + var msgBuilder strings.Builder msgBuilder.Grow(128) msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") - var regions []*core.RegionInfo + var startKeys, endKeys [][]byte for _, rg := range input { startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) if err != nil { @@ -975,32 +911,24 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - regions = append(regions, rc.ScanRegions(startKey, endKey, limit)...) + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) } - if len(regions) > 0 { - regionsIDList := make([]uint64, 0, len(regions)) - for _, region := range regions { - regionsIDList = append(regionsIDList, region.GetID()) - } - rc.AddSuspectRegions(regionsIDList...) + err = h.Handler.AccelerateRegionsScheduleInRanges(startKeys, endKeys, limit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.Text(w, http.StatusOK, msgBuilder.String()) } func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { rc := getCluster(r) - limit := defaultRegionLimit - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - } - if limit > maxRegionLimit { - limit = maxRegionLimit + limit, err := h.AdjustLimit(r.URL.Query().Get("limit")) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } regions := TopNRegions(rc.GetRegions(), less, limit) b, err := marshalRegionsInfoJSON(r.Context(), regions) @@ -1020,69 +948,33 @@ func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, // @Failure 400 {string} string "The input is invalid." // @Router /regions/scatter [post] func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - _, ok1 := input["start_key"].(string) - _, ok2 := input["end_key"].(string) - group, ok := input["group"].(string) - if !ok { - group = "" - } + rawStartKey, ok1 := input["start_key"].(string) + rawEndKey, ok2 := input["end_key"].(string) + group, _ := input["group"].(string) retryLimit := 5 if rl, ok := input["retry_limit"].(float64); ok { retryLimit = int(rl) } - opsCount := 0 - var failures map[uint64]error - var err error - if ok1 && ok2 { - startKey, _, err := apiutil.ParseKey("start_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - endKey, _, err := apiutil.ParseKey("end_key", input) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + + opsCount, failures, err := func() (int, map[uint64]error, error) { + if ok1 && ok2 { + return h.ScatterRegionsByRange(rawStartKey, rawEndKey, group, retryLimit) } - } else { ids, ok := typeutil.JSONToUint64Slice(input["regions_id"]) if !ok { - h.rd.JSON(w, http.StatusBadRequest, "regions_id is invalid") - return + return 0, nil, errors.New("regions_id is invalid") } - opsCount, failures, err = rc.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - } - // If there existed any operator failed to be added into Operator Controller, add its regions into unProcessedRegions - percentage := 100 - if len(failures) > 0 { - percentage = 100 - 100*len(failures)/(opsCount+len(failures)) - log.Debug("scatter regions", zap.Errors("failures", func() []error { - r := make([]error, 0, len(failures)) - for _, err := range failures { - r = append(r, err) - } - return r - }())) - } - s := struct { - ProcessedPercentage int `json:"processed-percentage"` - }{ - ProcessedPercentage: percentage, + return h.ScatterRegionsByID(ids, group, retryLimit, false) + }() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } + s := h.BuildScatterRegionsResp(opsCount, failures) h.rd.JSON(w, http.StatusOK, &s) } @@ -1095,16 +987,16 @@ func (h *regionsHandler) ScatterRegions(w http.ResponseWriter, r *http.Request) // @Failure 400 {string} string "The input is invalid." // @Router /regions/split [post] func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { - rc := getCluster(r) var input map[string]interface{} if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } - rawSplitKeys, ok := input["split_keys"].([]interface{}) + s, ok := input["split_keys"] if !ok { h.rd.JSON(w, http.StatusBadRequest, "split_keys should be provided.") return } + rawSplitKeys := s.([]string) if len(rawSplitKeys) < 1 { h.rd.JSON(w, http.StatusBadRequest, "empty split keys.") return @@ -1113,29 +1005,11 @@ func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { if rl, ok := input["retry_limit"].(float64); ok { retryLimit = int(rl) } - splitKeys := make([][]byte, 0, len(rawSplitKeys)) - for _, rawKey := range rawSplitKeys { - key, err := hex.DecodeString(rawKey.(string)) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - splitKeys = append(splitKeys, key) - } - s := struct { - ProcessedPercentage int `json:"processed-percentage"` - NewRegionsID []uint64 `json:"regions-id"` - }{} - percentage, newRegionsID := rc.GetRegionSplitter().SplitRegions(r.Context(), splitKeys, retryLimit) - s.ProcessedPercentage = percentage - s.NewRegionsID = newRegionsID - failpoint.Inject("splitResponses", func(val failpoint.Value) { - rawID, ok := val.(int) - if ok { - s.ProcessedPercentage = 100 - s.NewRegionsID = []uint64{uint64(rawID)} - } - }) + s, err := h.Handler.SplitRegions(r.Context(), rawSplitKeys, retryLimit) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.rd.JSON(w, http.StatusOK, &s) } diff --git a/server/api/region_test.go b/server/api/region_test.go index a39a1e5c5fd..3e794b0c412 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -424,9 +424,9 @@ func (suite *regionTestSuite) TestSplitRegions() { suite.Equal(100, s.ProcessedPercentage) suite.Equal([]uint64{newRegionID}, s.NewRegionsID) } - suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/split", suite.urlPrefix), []byte(body), checkOpt) - suite.NoError(failpoint.Disable("github.com/tikv/pd/server/api/splitResponses")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/splitResponses")) suite.NoError(err) } @@ -716,6 +716,8 @@ func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() { // correct test url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, suite.urlPrefix, hex.EncodeToString(r1.GetStartKey()), hex.EncodeToString(r1.GetEndKey())) + err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re)) + suite.NoError(err) // test one rule data, err := json.Marshal(bundle) @@ -727,11 +729,11 @@ func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() { suite.NoError(err) suite.Equal("REPLICATED", status) - suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/mockPending", "return(true)")) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/mockPending", "return(true)")) err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) suite.Equal("PENDING", status) - suite.NoError(failpoint.Disable("github.com/tikv/pd/server/api/mockPending")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/mockPending")) // test multiple rules r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}) diff --git a/server/api/server.go b/server/api/server.go index 992cd42d796..83e8908f1f3 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -89,6 +89,31 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP // However, the path "/region/id" is used to get the region by id, which is not what we want. return strings.Contains(r.URL.Path, "label") }), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/accelerate-schedule", + scheapi.APIPathPrefix+"/regions/accelerate-schedule", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/accelerate-schedule/batch", + scheapi.APIPathPrefix+"/regions/accelerate-schedule/batch", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/scatter", + scheapi.APIPathPrefix+"/regions/scatter", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/split", + scheapi.APIPathPrefix+"/regions/split", + mcs.SchedulingServiceName, + []string{http.MethodPost}), + serverapi.MicroserviceRedirectRule( + prefix+"/regions/replicated", + scheapi.APIPathPrefix+"/regions/replicated", + mcs.SchedulingServiceName, + []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/config/region-label", scheapi.APIPathPrefix+"/config/region-label", @@ -111,8 +136,6 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/schedulers", mcs.SchedulingServiceName, []string{http.MethodPost}), - // TODO: we need to consider the case that v1 api not support restful api. - // we might change the previous path parameters to query parameters. ), negroni.Wrap(r)), ) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 3873ca0f96d..9b04f09dabf 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -2,6 +2,7 @@ package scheduling_test import ( "context" + "encoding/hex" "encoding/json" "fmt" "net/http" @@ -235,9 +236,33 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) re.NoError(err) err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil, - testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil, - testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true")) + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + // Test Region + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/accelerate-schedule"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + body = fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/accelerate-schedule/batch"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) + body = fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/scatter"), []byte(body), + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + body = fmt.Sprintf(`{"retry_limit":%v, "split_keys": ["%s","%s","%s"]}`, 3, + hex.EncodeToString([]byte("bbb")), + hex.EncodeToString([]byte("ccc")), + hex.EncodeToString([]byte("ddd"))) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "regions/split"), []byte(body), + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, urlPrefix, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a2"))), nil, + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) } From 2b06e2ffd90cd12cd6324ed3c1b4a32a6f305f05 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 20 Nov 2023 16:23:51 +0800 Subject: [PATCH 03/14] update api.go Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 878d2214f85..74ac161eb99 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -176,8 +176,6 @@ func (s *Service) RegisterOperatorsRouter() { // RegisterRegionsRouter registers the router of the regions handler. func (s *Service) RegisterRegionsRouter() { router := s.root.Group("regions") - router.GET("/:id/label/:key", getRegionLabelByKey) - router.GET("/:id/labels", getRegionLabels) router.POST("/accelerate-schedule", accelerateRegionsScheduleInRange) router.POST("/accelerate-schedule/batch", accelerateRegionsScheduleInRanges) router.POST("/scatter", scatterRegions) @@ -1144,6 +1142,7 @@ func getRegionLabelRuleByID(c *gin.Context) { // @Produce json // @Success 200 {string} string "Accelerate regions scheduling in a given range [startKey, endKey)" // @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/accelerate-schedule [post] func accelerateRegionsScheduleInRange(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) @@ -1183,6 +1182,7 @@ func accelerateRegionsScheduleInRange(c *gin.Context) { // @Produce json // @Success 200 {string} string "Accelerate regions scheduling in given ranges [startKey1, endKey1), [startKey2, endKey2), ..." // @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/accelerate-schedule/batch [post] func accelerateRegionsScheduleInRanges(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) @@ -1233,6 +1233,7 @@ func accelerateRegionsScheduleInRanges(c *gin.Context) { // @Produce json // @Success 200 {string} string "Scatter regions by given key ranges or regions id distributed by given group with given retry limit" // @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/scatter [post] func scatterRegions(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) @@ -1275,6 +1276,7 @@ func scatterRegions(c *gin.Context) { // @Produce json // @Success 200 {string} string "Split regions with given split keys" // @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/split [post] func splitRegions(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) From 07e96f02af7a53911df478823618e25d321b012e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 21 Nov 2023 17:05:11 +0800 Subject: [PATCH 04/14] fix test Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 3 +-- pkg/schedule/handler/handler.go | 4 ++-- pkg/utils/apiutil/serverapi/middleware.go | 4 +++- server/api/region.go | 2 +- server/api/region_test.go | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 74ac161eb99..de613f86044 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1291,12 +1291,11 @@ func splitRegions(c *gin.Context) { c.String(http.StatusBadRequest, "split_keys should be provided.") return } - rawSplitKeys := s.([]string) + rawSplitKeys := s.([]interface{}) if len(rawSplitKeys) < 1 { c.String(http.StatusBadRequest, "empty split keys.") return } - fmt.Println(rawSplitKeys) retryLimit := 5 if rl, ok := input["retry_limit"].(float64); ok { retryLimit = int(rl) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 23487dfa107..353e2bb60e2 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1221,14 +1221,14 @@ type SplitRegionsResponse struct { } // SplitRegions splits regions by split keys. -func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []string, retryLimit int) (*SplitRegionsResponse, error) { +func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []interface{}, retryLimit int) (*SplitRegionsResponse, error) { co := h.GetCoordinator() if co == nil { return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() } splitKeys := make([][]byte, 0, len(rawSplitKeys)) for _, rawKey := range rawSplitKeys { - key, err := hex.DecodeString(rawKey) + key, err := hex.DecodeString(rawKey.(string)) if err != nil { return nil, err } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index e26327cb3ff..f257efb34f9 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -122,7 +122,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { - if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { + if h.s.IsServiceIndependent(rule.targetServiceName) && + strings.HasPrefix(r.URL.Path, rule.matchPath) && + slice.Contains(rule.matchMethods, r.Method) { if rule.filter != nil && !rule.filter(r) { continue } diff --git a/server/api/region.go b/server/api/region.go index a80e3d0d48a..62713cb6dcd 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -996,7 +996,7 @@ func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, "split_keys should be provided.") return } - rawSplitKeys := s.([]string) + rawSplitKeys := s.([]interface{}) if len(rawSplitKeys) < 1 { h.rd.JSON(w, http.StatusBadRequest, "empty split keys.") return diff --git a/server/api/region_test.go b/server/api/region_test.go index 17cb26fded3..4f6c80f228e 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -406,7 +406,7 @@ func (suite *regionTestSuite) TestScatterRegions() { func (suite *regionTestSuite) TestSplitRegions() { re := suite.Require() r1 := core.NewTestRegionInfo(601, 13, []byte("aaa"), []byte("ggg")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 13}, &metapb.Peer{Id: 6, StoreId: 13}) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) mustRegionHeartbeat(re, suite.svr, r1) mustPutStore(re, suite.svr, 13, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) newRegionID := uint64(11) From 0dfc784e55e2d299b5150ae6f6f4d54a934c19dd Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 21 Nov 2023 18:24:49 +0800 Subject: [PATCH 05/14] move test Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 9 +- pkg/utils/apiutil/serverapi/middleware.go | 4 +- server/api/region_test.go | 222 ----------------- tests/server/api/operator_test.go | 4 +- tests/server/api/region_test.go | 287 ++++++++++++++++++++++ 5 files changed, 297 insertions(+), 229 deletions(-) create mode 100644 tests/server/api/region_test.go diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index de613f86044..b59780b7a61 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1318,8 +1318,13 @@ func splitRegions(c *gin.Context) { // @Router /regions/replicated [get] func checkRegionsReplicated(c *gin.Context) { handler := c.MustGet(handlerKey).(*handler.Handler) - rawStartKey, _ := c.GetQuery("start_key") - rawEndKey, _ := c.GetQuery("end_key") + rawStartKey, ok1 := c.GetQuery("startKey") + rawEndKey, ok2 := c.GetQuery("endKey") + if !ok1 || !ok2 { + c.String(http.StatusBadRequest, "there is no start_key or end_key") + return + } + state, err := handler.CheckRegionsReplicated(rawStartKey, rawEndKey) if err != nil { c.String(http.StatusBadRequest, err.Error()) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index f257efb34f9..335680d7da0 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -151,8 +151,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri } else { r.URL.Path = rule.targetPath } - log.Debug("redirect to micro service", zap.String("path", r.URL.Path), zap.String("origin-path", origin), - zap.String("target", addr), zap.String("method", r.Method)) + log.Info("redirect to micro service", zap.String("path", r.URL.Path), zap.String("origin-path", origin), + zap.String("target", addr), zap.String("method", r.Method), zap.Any("query", r.URL.Query())) return true, addr } } diff --git a/server/api/region_test.go b/server/api/region_test.go index 4f6c80f228e..83d9dff402b 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -28,13 +28,11 @@ import ( "time" "github.com/docker/go-units" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/apiutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" @@ -337,99 +335,6 @@ func (suite *regionTestSuite) TestTop() { suite.checkTopRegions(fmt.Sprintf("%s/regions/cpu", suite.urlPrefix), []uint64{3, 2, 1}) } -func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRange() { - re := suite.Require() - r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) - r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) - r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) - mustRegionHeartbeat(re, suite.svr, r1) - mustRegionHeartbeat(re, suite.svr, r2) - mustRegionHeartbeat(re, suite.svr, r3) - body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) - - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", suite.urlPrefix), []byte(body), tu.StatusOK(re)) - suite.NoError(err) - idList := suite.svr.GetRaftCluster().GetSuspectRegions() - suite.Len(idList, 2) -} - -func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { - re := suite.Require() - r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) - r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) - r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) - r4 := core.NewTestRegionInfo(560, 16, []byte("a4"), []byte("a5")) - r5 := core.NewTestRegionInfo(561, 17, []byte("a5"), []byte("a6")) - mustRegionHeartbeat(re, suite.svr, r1) - mustRegionHeartbeat(re, suite.svr, r2) - mustRegionHeartbeat(re, suite.svr, r3) - mustRegionHeartbeat(re, suite.svr, r4) - mustRegionHeartbeat(re, suite.svr, r5) - body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) - - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", suite.urlPrefix), []byte(body), tu.StatusOK(re)) - suite.NoError(err) - idList := suite.svr.GetRaftCluster().GetSuspectRegions() - suite.Len(idList, 4) -} - -func (suite *regionTestSuite) TestScatterRegions() { - re := suite.Require() - r1 := core.NewTestRegionInfo(601, 13, []byte("b1"), []byte("b2")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) - r2 := core.NewTestRegionInfo(602, 13, []byte("b2"), []byte("b3")) - r2.GetMeta().Peers = append(r2.GetMeta().Peers, &metapb.Peer{Id: 7, StoreId: 14}, &metapb.Peer{Id: 8, StoreId: 15}) - r3 := core.NewTestRegionInfo(603, 13, []byte("b4"), []byte("b4")) - r3.GetMeta().Peers = append(r3.GetMeta().Peers, &metapb.Peer{Id: 9, StoreId: 14}, &metapb.Peer{Id: 10, StoreId: 15}) - mustRegionHeartbeat(re, suite.svr, r1) - mustRegionHeartbeat(re, suite.svr, r2) - mustRegionHeartbeat(re, suite.svr, r3) - mustPutStore(re, suite.svr, 13, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) - mustPutStore(re, suite.svr, 14, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) - mustPutStore(re, suite.svr, 15, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) - mustPutStore(re, suite.svr, 16, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) - body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) - - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", suite.urlPrefix), []byte(body), tu.StatusOK(re)) - suite.NoError(err) - op1 := suite.svr.GetRaftCluster().GetOperatorController().GetOperator(601) - op2 := suite.svr.GetRaftCluster().GetOperatorController().GetOperator(602) - op3 := suite.svr.GetRaftCluster().GetOperatorController().GetOperator(603) - // At least one operator used to scatter region - suite.True(op1 != nil || op2 != nil || op3 != nil) - - body = `{"regions_id": [601, 602, 603]}` - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", suite.urlPrefix), []byte(body), tu.StatusOK(re)) - suite.NoError(err) -} - -func (suite *regionTestSuite) TestSplitRegions() { - re := suite.Require() - r1 := core.NewTestRegionInfo(601, 13, []byte("aaa"), []byte("ggg")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) - mustRegionHeartbeat(re, suite.svr, r1) - mustPutStore(re, suite.svr, 13, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{}) - newRegionID := uint64(11) - body := fmt.Sprintf(`{"retry_limit":%v, "split_keys": ["%s","%s","%s"]}`, 3, - hex.EncodeToString([]byte("bbb")), - hex.EncodeToString([]byte("ccc")), - hex.EncodeToString([]byte("ddd"))) - checkOpt := func(res []byte, code int, _ http.Header) { - s := &struct { - ProcessedPercentage int `json:"processed-percentage"` - NewRegionsID []uint64 `json:"regions-id"` - }{} - err := json.Unmarshal(res, s) - suite.NoError(err) - suite.Equal(100, s.ProcessedPercentage) - suite.Equal([]uint64{newRegionID}, s.NewRegionsID) - } - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/split", suite.urlPrefix), []byte(body), checkOpt) - suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/splitResponses")) - suite.NoError(err) -} - func (suite *regionTestSuite) checkTopRegions(url string, regionIDs []uint64) { regions := &RegionsInfo{} err := tu.ReadGetJSON(suite.Require(), testDialClient, url, regions) @@ -652,133 +557,6 @@ func (suite *getRegionRangeHolesTestSuite) TestRegionRangeHoles() { }, *rangeHoles) } -type regionsReplicatedTestSuite struct { - suite.Suite - svr *server.Server - cleanup tu.CleanupFunc - urlPrefix string -} - -func TestRegionsReplicatedTestSuite(t *testing.T) { - suite.Run(t, new(regionsReplicatedTestSuite)) -} - -func (suite *regionsReplicatedTestSuite) SetupSuite() { - re := suite.Require() - suite.svr, suite.cleanup = mustNewServer(re) - server.MustWaitLeader(re, []*server.Server{suite.svr}) - - addr := suite.svr.GetAddr() - suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) - - mustBootstrapCluster(re, suite.svr) -} - -func (suite *regionsReplicatedTestSuite) TearDownSuite() { - suite.cleanup() -} - -func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() { - re := suite.Require() - // enable placement rule - suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-placement-rules":"true"}`), tu.StatusOK(re))) - defer func() { - suite.NoError(tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config", []byte(`{"enable-placement-rules":"false"}`), tu.StatusOK(re))) - }() - - // add test region - r1 := core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) - mustRegionHeartbeat(re, suite.svr, r1) - - // set the bundle - bundle := []placement.GroupBundle{ - { - ID: "5", - Index: 5, - Rules: []*placement.Rule{ - { - ID: "foo", Index: 1, Role: placement.Voter, Count: 1, - }, - }, - }, - } - - status := "" - - // invalid url - url := fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, suite.urlPrefix, "_", "t") - err := tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest)) - suite.NoError(err) - - url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, suite.urlPrefix, hex.EncodeToString(r1.GetStartKey()), "_") - err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest)) - suite.NoError(err) - - // correct test - url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, suite.urlPrefix, hex.EncodeToString(r1.GetStartKey()), hex.EncodeToString(r1.GetEndKey())) - err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re)) - suite.NoError(err) - - // test one rule - data, err := json.Marshal(bundle) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) - suite.NoError(err) - - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) - - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/mockPending", "return(true)")) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("PENDING", status) - suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/mockPending")) - // test multiple rules - r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}) - mustRegionHeartbeat(re, suite.svr, r1) - - bundle[0].Rules = append(bundle[0].Rules, &placement.Rule{ - ID: "bar", Index: 1, Role: placement.Voter, Count: 1, - }) - data, err = json.Marshal(bundle) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) - suite.NoError(err) - - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) - - // test multiple bundles - bundle = append(bundle, placement.GroupBundle{ - ID: "6", - Index: 6, - Rules: []*placement.Rule{ - { - ID: "foo", Index: 1, Role: placement.Voter, Count: 2, - }, - }, - }) - data, err = json.Marshal(bundle) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) - suite.NoError(err) - - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("INPROGRESS", status) - - r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) - mustRegionHeartbeat(re, suite.svr, r1) - - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) -} - func TestRegionsInfoMarshal(t *testing.T) { re := require.New(t) regionWithNilPeer := core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1}) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 14b8618f6a6..c27ebbe7ee8 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -477,9 +477,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te suite.NoError(err) err = tu.CheckDelete(testDialClient, regionURL, tu.StatusOK(re)) } else { - // FIXME: we should check the delete result, which should be failed, - // but the delete operator may be success because the cluster create a new operator to remove ophan peer. - err = tu.CheckDelete(testDialClient, regionURL) + err = tu.CheckDelete(testDialClient, regionURL, tu.StatusNotOK(re)) } suite.NoError(err) } diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go new file mode 100644 index 00000000000..452ef365a6d --- /dev/null +++ b/tests/server/api/region_test.go @@ -0,0 +1,287 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule/placement" + tu "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/tests" +) + +type regionTestSuite struct { + suite.Suite +} + +func TestRegionTestSuite(t *testing.T) { + suite.Run(t, new(regionTestSuite)) +} +func (suite *regionTestSuite) TestSplitRegions() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkSplitRegions) +} + +func (suite *regionTestSuite) checkSplitRegions(cluster *tests.TestCluster) { + leader := cluster.GetLeaderServer() + urlPrefix := leader.GetAddr() + "/pd/api/v1" + re := suite.Require() + r1 := core.NewTestRegionInfo(601, 13, []byte("aaa"), []byte("ggg")) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) + tests.MustPutRegionInfo(re, cluster, r1) + s1 := &metapb.Store{ + Id: 13, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) + newRegionID := uint64(11) + body := fmt.Sprintf(`{"retry_limit":%v, "split_keys": ["%s","%s","%s"]}`, 3, + hex.EncodeToString([]byte("bbb")), + hex.EncodeToString([]byte("ccc")), + hex.EncodeToString([]byte("ddd"))) + checkOpt := func(res []byte, code int, _ http.Header) { + s := &struct { + ProcessedPercentage int `json:"processed-percentage"` + NewRegionsID []uint64 `json:"regions-id"` + }{} + err := json.Unmarshal(res, s) + suite.NoError(err) + suite.Equal(100, s.ProcessedPercentage) + suite.Equal([]uint64{newRegionID}, s.NewRegionsID) + } + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/splitResponses", fmt.Sprintf("return(%v)", newRegionID))) + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/split", urlPrefix), []byte(body), checkOpt) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/splitResponses")) + suite.NoError(err) +} + +func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRange() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkAccelerateRegionsScheduleInRange) +} + +func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tests.TestCluster) { + leader := cluster.GetLeaderServer() + urlPrefix := leader.GetAddr() + "/pd/api/v1" + re := suite.Require() + r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) + r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) + r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) + tests.MustPutRegionInfo(re, cluster, r1) + tests.MustPutRegionInfo(re, cluster, r2) + tests.MustPutRegionInfo(re, cluster, r3) + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) + + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) + idList := leader.GetRaftCluster().GetSuspectRegions() + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() + } + suite.Len(idList, 2) +} + +func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkAccelerateRegionsScheduleInRanges) +} + +func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *tests.TestCluster) { + leader := cluster.GetLeaderServer() + urlPrefix := leader.GetAddr() + "/pd/api/v1" + re := suite.Require() + r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) + r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) + r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) + r4 := core.NewTestRegionInfo(560, 16, []byte("a4"), []byte("a5")) + r5 := core.NewTestRegionInfo(561, 17, []byte("a5"), []byte("a6")) + tests.MustPutRegionInfo(re, cluster, r1) + tests.MustPutRegionInfo(re, cluster, r2) + tests.MustPutRegionInfo(re, cluster, r3) + tests.MustPutRegionInfo(re, cluster, r4) + tests.MustPutRegionInfo(re, cluster, r5) + body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) + idList := leader.GetRaftCluster().GetSuspectRegions() + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() + } + suite.Len(idList, 4) +} + +func (suite *regionTestSuite) TestScatterRegions() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkScatterRegions) +} + +func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { + leader := cluster.GetLeaderServer() + urlPrefix := leader.GetAddr() + "/pd/api/v1" + re := suite.Require() + r1 := core.NewTestRegionInfo(601, 13, []byte("b1"), []byte("b2")) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) + r2 := core.NewTestRegionInfo(602, 13, []byte("b2"), []byte("b3")) + r2.GetMeta().Peers = append(r2.GetMeta().Peers, &metapb.Peer{Id: 7, StoreId: 14}, &metapb.Peer{Id: 8, StoreId: 15}) + r3 := core.NewTestRegionInfo(603, 13, []byte("b4"), []byte("b4")) + r3.GetMeta().Peers = append(r3.GetMeta().Peers, &metapb.Peer{Id: 9, StoreId: 14}, &metapb.Peer{Id: 10, StoreId: 15}) + tests.MustPutRegionInfo(re, cluster, r1) + tests.MustPutRegionInfo(re, cluster, r2) + tests.MustPutRegionInfo(re, cluster, r3) + for i := 13; i <= 16; i++ { + s1 := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) + } + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) + + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) + oc := leader.GetRaftCluster().GetOperatorController() + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + oc = sche.GetCoordinator().GetOperatorController() + } + + op1 := oc.GetOperator(601) + op2 := oc.GetOperator(602) + op3 := oc.GetOperator(603) + // At least one operator used to scatter region + suite.True(op1 != nil || op2 != nil || op3 != nil) + + body = `{"regions_id": [601, 602, 603]}` + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) +} + +func (suite *regionTestSuite) TestCheckRegionsReplicated() { + env := tests.NewSchedulingTestEnvironment(suite.T(), + func(conf *config.Config, serverName string) { + conf.Replication.EnablePlacementRules = true + }) + // FIXME: enable this test in two modes. + env.RunTestInPDMode(suite.checkRegionsReplicated) +} + +func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) { + leader := cluster.GetLeaderServer() + urlPrefix := leader.GetAddr() + "/pd/api/v1" + re := suite.Require() + + // add test region + r1 := core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) + tests.MustPutRegionInfo(re, cluster, r1) + + // set the bundle + bundle := []placement.GroupBundle{ + { + ID: "5", + Index: 5, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, + }, + }, + }, + } + + status := "" + + // invalid url + url := fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, urlPrefix, "_", "t") + err := tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest)) + suite.NoError(err) + + url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, urlPrefix, hex.EncodeToString(r1.GetStartKey()), "_") + err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest)) + suite.NoError(err) + + // correct test + url = fmt.Sprintf(`%s/regions/replicated?startKey=%s&endKey=%s`, urlPrefix, hex.EncodeToString(r1.GetStartKey()), hex.EncodeToString(r1.GetEndKey())) + err = tu.CheckGetJSON(testDialClient, url, nil, tu.StatusOK(re)) + suite.NoError(err) + + // test one rule + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + suite.Equal("REPLICATED", status) + + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/mockPending", "return(true)")) + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + suite.Equal("PENDING", status) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/handler/mockPending")) + // test multiple rules + r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}) + tests.MustPutRegionInfo(re, cluster, r1) + + bundle[0].Rules = append(bundle[0].Rules, &placement.Rule{ + ID: "bar", Index: 1, Role: placement.Voter, Count: 1, + }) + data, err = json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + suite.Equal("REPLICATED", status) + + // test multiple bundles + bundle = append(bundle, placement.GroupBundle{ + ID: "6", + Index: 6, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 2, + }, + }, + }) + data, err = json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + suite.Equal("INPROGRESS", status) + + r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) + tests.MustPutRegionInfo(re, cluster, r1) + + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + suite.Equal("REPLICATED", status) +} From 11c72ac79fd46345d2b39abf865ff6e440eecfe0 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 01:24:58 +0800 Subject: [PATCH 06/14] make TestGetAllByKey stable Signed-off-by: lhy1024 --- pkg/utils/apiutil/serverapi/middleware.go | 4 ++-- tests/pdctl/scheduler/scheduler_test.go | 14 ++++++++------ tests/server/api/rule_test.go | 6 ++++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 335680d7da0..f257efb34f9 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -151,8 +151,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri } else { r.URL.Path = rule.targetPath } - log.Info("redirect to micro service", zap.String("path", r.URL.Path), zap.String("origin-path", origin), - zap.String("target", addr), zap.String("method", r.Method), zap.Any("query", r.URL.Query())) + log.Debug("redirect to micro service", zap.String("path", r.URL.Path), zap.String("origin-path", origin), + zap.String("target", addr), zap.String("method", r.Method)) return true, addr } } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 7098637c84a..56d7b1cd078 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -646,18 +646,20 @@ func TestForwardSchedulerRequest(t *testing.T) { server := cluster.GetLeaderServer() re.NoError(server.BootstrapCluster()) backendEndpoints := server.GetAddr() - tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) + tc, err := tests.NewTestSchedulingCluster(ctx, 1, backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) cmd := pdctlCmd.GetRootCmd() args := []string{"-u", backendEndpoints, "scheduler", "show"} - var slice []string - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - re.NoError(json.Unmarshal(output, &slice)) - re.Contains(slice, "balance-leader-scheduler") + var sches []string + testutil.Eventually(re, func() bool { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &sches)) + return slice.Contains(sches, "balance-leader-scheduler") + }) mustUsage := func(args []string) { output, err := pdctl.ExecuteCommand(cmd, args...) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index ffdf56b6567..ac52362df4e 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -565,8 +565,10 @@ func (suite *ruleTestSuite) checkGetAllByKey(cluster *tests.TestCluster) { var resp []*placement.Rule url := fmt.Sprintf("%s/rules/key/%s", urlPrefix, testCase.key) if testCase.success { - err = tu.ReadGetJSON(re, testDialClient, url, &resp) - suite.Len(resp, testCase.respSize) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &resp) + return len(resp) == testCase.respSize + }) } else { err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, testCase.code)) } From 45f1541e5f8662e67b83be7337e8aba285ac3410 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 02:02:28 +0800 Subject: [PATCH 07/14] fix test Signed-off-by: lhy1024 --- pkg/schedule/schedulers/split_bucket.go | 4 ++-- tests/integrations/mcs/scheduling/api_test.go | 7 +++++-- tests/integrations/mcs/tso/api_test.go | 3 +++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index a08c84372b5..70027ee4c04 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -53,7 +53,7 @@ var ( splitBucketOperatorExistCounter = schedulerCounter.WithLabelValues(SplitBucketName, "operator-exist") splitBucketKeyRangeNotMatchCounter = schedulerCounter.WithLabelValues(SplitBucketName, "key-range-not-match") splitBucketNoSplitKeysCounter = schedulerCounter.WithLabelValues(SplitBucketName, "no-split-keys") - splitBucketCreateOpeartorFailCounter = schedulerCounter.WithLabelValues(SplitBucketName, "create-operator-fail") + splitBucketCreateOperatorFailCounter = schedulerCounter.WithLabelValues(SplitBucketName, "create-operator-fail") splitBucketNewOperatorCounter = schedulerCounter.WithLabelValues(SplitBucketName, "new-operator") ) @@ -275,7 +275,7 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op op, err := operator.CreateSplitRegionOperator(SplitBucketType, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, splitKey) if err != nil { - splitBucketCreateOpeartorFailCounter.Inc() + splitBucketCreateOperatorFailCounter.Inc() return nil } splitBucketNewOperatorCounter.Inc() diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index e1564f05f88..0607b1dee9a 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -127,8 +128,10 @@ func (suite *apiTestSuite) TestAPIForward() { urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints) var slice []string var resp map[string]interface{} - - // Test opeartor + testutil.Eventually(re, func() bool { + return suite.cluster.GetLeaderServer().GetServer().GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) + }) + // Test operators err := testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 81cc798851f..504b3e02e6d 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -110,6 +110,9 @@ func (suite *tsoAPITestSuite) TestForwardResetTS() { re.NotNil(primary) url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts" + testutil.Eventually(re, func() bool { + return suite.pdCluster.GetLeaderServer().GetServer().GetRaftCluster().IsServiceIndependent(mcsutils.TSOServiceName) + }) // Test reset ts input := []byte(`{"tso":"121312", "force-use-larger":true}`) err := testutil.CheckPostJSON(dialClient, url, input, From e682aa4a9b0533a3a9640f6d808d835b355b5786 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 02:13:48 +0800 Subject: [PATCH 08/14] fix test Signed-off-by: lhy1024 --- tests/testutil.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/testutil.go b/tests/testutil.go index 2ccf6fb76be..1bdd7ae10dd 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -34,6 +34,7 @@ import ( scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" @@ -272,11 +273,13 @@ func (s *SchedulingTestEnvironment) RunTestInPDMode(test func(*TestCluster)) { func (s *SchedulingTestEnvironment) RunTestInAPIMode(test func(*TestCluster)) { s.t.Log("start to run test in api mode") re := require.New(s.t) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) s.startCluster(apiMode) test(s.cluster) s.cleanup() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) s.t.Log("finish to run test in api mode") } @@ -299,7 +302,6 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) case apiMode: - re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) s.cluster, err = NewTestAPICluster(s.ctx, 1, s.opts...) re.NoError(err) err = s.cluster.RunInitialServers() @@ -314,6 +316,8 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) { tc.WaitForPrimaryServing(re) s.cluster.SetSchedulingCluster(tc) time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member - re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) + testutil.Eventually(re, func() bool { + return s.cluster.GetLeaderServer().GetServer().GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) + }) } } From 4bf6c9f9238c39b01f74e8840d10343ca93ac0fe Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 03:03:35 +0800 Subject: [PATCH 09/14] address comments Signed-off-by: lhy1024 --- pkg/utils/apiutil/serverapi/middleware.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index f257efb34f9..28e8a018567 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -122,19 +122,19 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { - if h.s.IsServiceIndependent(rule.targetServiceName) && - strings.HasPrefix(r.URL.Path, rule.matchPath) && + if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { if rule.filter != nil && !rule.filter(r) { continue } - origin := r.URL.Path + // we check the service primary addr here, so no need to check independently again. addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", zap.String("path", r.URL.Path)) } // If the URL contains escaped characters, use RawPath instead of Path + origin := r.URL.Path path := r.URL.Path if r.URL.RawPath != "" { path = r.URL.RawPath From c55753bf498b0a10c6972d2d954740f1a2739692 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 05:58:00 +0800 Subject: [PATCH 10/14] fix Signed-off-by: lhy1024 --- server/api/server.go | 5 ----- tests/integrations/mcs/tso/api_test.go | 3 --- 2 files changed, 8 deletions(-) diff --git a/server/api/server.go b/server/api/server.go index 4befab67a51..83e01d65321 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -95,11 +95,6 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/regions/accelerate-schedule", mcs.SchedulingServiceName, []string{http.MethodPost}), - serverapi.MicroserviceRedirectRule( - prefix+"/regions/accelerate-schedule/batch", - scheapi.APIPathPrefix+"/regions/accelerate-schedule/batch", - mcs.SchedulingServiceName, - []string{http.MethodPost}), serverapi.MicroserviceRedirectRule( prefix+"/regions/scatter", scheapi.APIPathPrefix+"/regions/scatter", diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 504b3e02e6d..81cc798851f 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -110,9 +110,6 @@ func (suite *tsoAPITestSuite) TestForwardResetTS() { re.NotNil(primary) url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts" - testutil.Eventually(re, func() bool { - return suite.pdCluster.GetLeaderServer().GetServer().GetRaftCluster().IsServiceIndependent(mcsutils.TSOServiceName) - }) // Test reset ts input := []byte(`{"tso":"121312", "force-use-larger":true}`) err := testutil.CheckPostJSON(dialClient, url, input, From 68b58a9b7a4e4e33057a283d299e71404d4e8e60 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 17:36:19 +0800 Subject: [PATCH 11/14] remove redundant route Signed-off-by: lhy1024 --- server/api/server.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/api/server.go b/server/api/server.go index 83e01d65321..ad614593b2f 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -110,11 +110,6 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/regions/replicated", mcs.SchedulingServiceName, []string{http.MethodGet}), - serverapi.MicroserviceRedirectRule( - prefix+"/config/region-label", - scheapi.APIPathPrefix+"/config/region-label", - mcs.SchedulingServiceName, - []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/config/region-label/rules", scheapi.APIPathPrefix+"/config/region-label/rules", From c10643169f6f32d86f21596dc095b7c091dd8d93 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 22 Nov 2023 18:28:07 +0800 Subject: [PATCH 12/14] test Signed-off-by: lhy1024 --- tests/server/api/region_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 452ef365a6d..a30c3159fd4 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/testutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -100,7 +101,9 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tes if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() } - suite.Len(idList, 2) + testutil.Eventually(re, func() bool { + return len(idList) == 2 + }) } func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { @@ -130,7 +133,9 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() } - suite.Len(idList, 4) + testutil.Eventually(re, func() bool { + return len(idList) == 4 + }) } func (suite *regionTestSuite) TestScatterRegions() { From 8a6f56df37507e6f7dbf09eda0a2c1dd79b0d8e9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 23 Nov 2023 02:10:43 +0800 Subject: [PATCH 13/14] make test stable Signed-off-by: lhy1024 --- tests/pdctl/scheduler/scheduler_test.go | 58 +++++++--------- tests/server/api/region_test.go | 88 ++++++++++++++++++------- 2 files changed, 88 insertions(+), 58 deletions(-) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index e06419f6f6d..7e9aeef16ee 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -362,6 +362,14 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "rank-formula-version": "v2", "split-thresholds": 0.2, } + checkHotSchedulerConfig := func(expect map[string]interface{}) { + testutil.Eventually(re, func() bool { + var conf1 map[string]interface{} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + return reflect.DeepEqual(expect, conf1) + }) + } + var conf map[string]interface{} mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) re.Equal(expected1, conf) @@ -370,72 +378,58 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) re.Contains(echo, "Success!") expected1["src-tolerance-ratio"] = 1.02 - var conf1 map[string]interface{} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"byte", "key"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"key", "byte"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) // write-priorities is divided into write-leader-priorities and write-peer-priorities echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) re.Contains(echo, "Failed!") re.Contains(echo, "Config item is not found.") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) re.Contains(echo, "Failed!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + checkHotSchedulerConfig(expected1) expected1["rank-formula-version"] = "v2" echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) expected1["rank-formula-version"] = "v1" echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) expected1["forbid-rw-type"] = "read" echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) re.Contains(echo, "Success!") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) + checkHotSchedulerConfig(expected1) // test compatibility re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) @@ -446,13 +440,11 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) // After upgrading, we should not use query. - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) + checkHotSchedulerConfig(expected1) // cannot set qps as write-peer-priorities echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) + checkHotSchedulerConfig(expected1) // test remove and add echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) @@ -462,7 +454,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // test balance leader config conf = make(map[string]interface{}) - conf1 = make(map[string]interface{}) + conf1 := make(map[string]interface{}) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) re.Equal(4., conf["batch"]) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index a30c3159fd4..bd60d059edd 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/utils/testutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -39,6 +38,7 @@ type regionTestSuite struct { func TestRegionTestSuite(t *testing.T) { suite.Run(t, new(regionTestSuite)) } + func (suite *regionTestSuite) TestSplitRegions() { env := tests.NewSchedulingTestEnvironment(suite.T()) env.RunTestInTwoModes(suite.checkSplitRegions) @@ -48,15 +48,17 @@ func (suite *regionTestSuite) checkSplitRegions(cluster *tests.TestCluster) { leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() - r1 := core.NewTestRegionInfo(601, 13, []byte("aaa"), []byte("ggg")) - r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) - tests.MustPutRegionInfo(re, cluster, r1) s1 := &metapb.Store{ Id: 13, State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, } tests.MustPutStore(re, cluster, s1) + r1 := core.NewTestRegionInfo(601, 13, []byte("aaa"), []byte("ggg")) + r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) + tests.MustPutRegionInfo(re, cluster, r1) + suite.checkRegionCount(cluster, 1) + newRegionID := uint64(11) body := fmt.Sprintf(`{"retry_limit":%v, "split_keys": ["%s","%s","%s"]}`, 3, hex.EncodeToString([]byte("bbb")), @@ -87,23 +89,30 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tes leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() + for i := 13; i <= 15; i++ { + s1 := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) + } r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) tests.MustPutRegionInfo(re, cluster, r1) tests.MustPutRegionInfo(re, cluster, r2) tests.MustPutRegionInfo(re, cluster, r3) - body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) + suite.checkRegionCount(cluster, 3) + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", urlPrefix), []byte(body), tu.StatusOK(re)) suite.NoError(err) idList := leader.GetRaftCluster().GetSuspectRegions() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() } - testutil.Eventually(re, func() bool { - return len(idList) == 2 - }) + re.Len(idList, 2, len(idList)) } func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { @@ -115,6 +124,14 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() + for i := 13; i <= 17; i++ { + s1 := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) + } r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) @@ -125,17 +142,17 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te tests.MustPutRegionInfo(re, cluster, r3) tests.MustPutRegionInfo(re, cluster, r4) tests.MustPutRegionInfo(re, cluster, r5) - body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + suite.checkRegionCount(cluster, 5) + body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, + hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", urlPrefix), []byte(body), tu.StatusOK(re)) suite.NoError(err) idList := leader.GetRaftCluster().GetSuspectRegions() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { idList = sche.GetCluster().GetCoordinator().GetCheckerController().GetSuspectRegions() } - testutil.Eventually(re, func() bool { - return len(idList) == 4 - }) + re.Len(idList, 4) } func (suite *regionTestSuite) TestScatterRegions() { @@ -147,6 +164,14 @@ func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() + for i := 13; i <= 16; i++ { + s1 := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) + } r1 := core.NewTestRegionInfo(601, 13, []byte("b1"), []byte("b2")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15}) r2 := core.NewTestRegionInfo(602, 13, []byte("b2"), []byte("b3")) @@ -156,16 +181,9 @@ func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { tests.MustPutRegionInfo(re, cluster, r1) tests.MustPutRegionInfo(re, cluster, r2) tests.MustPutRegionInfo(re, cluster, r3) - for i := 13; i <= 16; i++ { - s1 := &metapb.Store{ - Id: uint64(i), - State: metapb.StoreState_Up, - NodeState: metapb.NodeState_Serving, - } - tests.MustPutStore(re, cluster, s1) - } - body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) + suite.checkRegionCount(cluster, 3) + body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3"))) err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", urlPrefix), []byte(body), tu.StatusOK(re)) suite.NoError(err) oc := leader.GetRaftCluster().GetOperatorController() @@ -189,7 +207,6 @@ func (suite *regionTestSuite) TestCheckRegionsReplicated() { func(conf *config.Config, serverName string) { conf.Replication.EnablePlacementRules = true }) - // FIXME: enable this test in two modes. env.RunTestInPDMode(suite.checkRegionsReplicated) } @@ -199,8 +216,15 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) re := suite.Require() // add test region + s1 := &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + } + tests.MustPutStore(re, cluster, s1) r1 := core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) tests.MustPutRegionInfo(re, cluster, r1) + suite.checkRegionCount(cluster, 1) // set the bundle bundle := []placement.GroupBundle{ @@ -237,9 +261,11 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/handler/mockPending", "return(true)")) err = tu.ReadGetJSON(re, testDialClient, url, &status) @@ -290,3 +316,15 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) suite.NoError(err) suite.Equal("REPLICATED", status) } + +func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count int) { + leader := cluster.GetLeaderServer() + tu.Eventually(suite.Require(), func() bool { + return leader.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count == count + }) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + tu.Eventually(suite.Require(), func() bool { + return sche.GetCluster().GetRegionCount([]byte{}, []byte{}) == count + }) + } +} From b777d28895cca46f3138e2ac724251ee7a506a30 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 24 Nov 2023 01:55:54 +0800 Subject: [PATCH 14/14] make test stable Signed-off-by: lhy1024 --- scripts/ci-subtask.sh | 1 + tests/server/api/region_test.go | 38 +++++++++++++++++++++------------ 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/scripts/ci-subtask.sh b/scripts/ci-subtask.sh index 389d7f43341..a2e396088d6 100755 --- a/scripts/ci-subtask.sh +++ b/scripts/ci-subtask.sh @@ -29,6 +29,7 @@ else weight() { [[ $1 == "github.com/tikv/pd/server/api" ]] && return 30 [[ $1 == "github.com/tikv/pd/pkg/schedule" ]] && return 30 + [[ $1 == "pd/tests/server/api" ]] && return 30 [[ $1 =~ "pd/tests" ]] && return 5 return 1 } diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index bd60d059edd..dcd31d6462d 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -81,7 +81,11 @@ func (suite *regionTestSuite) checkSplitRegions(cluster *tests.TestCluster) { } func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRange() { - env := tests.NewSchedulingTestEnvironment(suite.T()) + env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, serverName string) { + // FIXME: enable placement rules + conf.Replication.EnablePlacementRules = false + conf.Replication.MaxReplicas = 1 + }) env.RunTestInTwoModes(suite.checkAccelerateRegionsScheduleInRange) } @@ -89,7 +93,7 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tes leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() - for i := 13; i <= 15; i++ { + for i := 1; i <= 3; i++ { s1 := &metapb.Store{ Id: uint64(i), State: metapb.StoreState_Up, @@ -97,16 +101,17 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tes } tests.MustPutStore(re, cluster, s1) } - r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) - r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) - r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) + r1 := core.NewTestRegionInfo(557, 1, []byte("a1"), []byte("a2")) + r2 := core.NewTestRegionInfo(558, 2, []byte("a2"), []byte("a3")) + r3 := core.NewTestRegionInfo(559, 3, []byte("a3"), []byte("a4")) tests.MustPutRegionInfo(re, cluster, r1) tests.MustPutRegionInfo(re, cluster, r2) tests.MustPutRegionInfo(re, cluster, r3) suite.checkRegionCount(cluster, 3) body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3"))) - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", urlPrefix), []byte(body), tu.StatusOK(re)) + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", urlPrefix), []byte(body), + tu.StatusOK(re)) suite.NoError(err) idList := leader.GetRaftCluster().GetSuspectRegions() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { @@ -116,7 +121,11 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRange(cluster *tes } func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { - env := tests.NewSchedulingTestEnvironment(suite.T()) + env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, serverName string) { + // FIXME: enable placement rules + conf.Replication.EnablePlacementRules = false + conf.Replication.MaxReplicas = 1 + }) env.RunTestInTwoModes(suite.checkAccelerateRegionsScheduleInRanges) } @@ -124,7 +133,7 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() - for i := 13; i <= 17; i++ { + for i := 1; i <= 5; i++ { s1 := &metapb.Store{ Id: uint64(i), State: metapb.StoreState_Up, @@ -132,11 +141,11 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te } tests.MustPutStore(re, cluster, s1) } - r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) - r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) - r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) - r4 := core.NewTestRegionInfo(560, 16, []byte("a4"), []byte("a5")) - r5 := core.NewTestRegionInfo(561, 17, []byte("a5"), []byte("a6")) + r1 := core.NewTestRegionInfo(557, 1, []byte("a1"), []byte("a2")) + r2 := core.NewTestRegionInfo(558, 2, []byte("a2"), []byte("a3")) + r3 := core.NewTestRegionInfo(559, 3, []byte("a3"), []byte("a4")) + r4 := core.NewTestRegionInfo(560, 4, []byte("a4"), []byte("a5")) + r5 := core.NewTestRegionInfo(561, 5, []byte("a5"), []byte("a6")) tests.MustPutRegionInfo(re, cluster, r1) tests.MustPutRegionInfo(re, cluster, r2) tests.MustPutRegionInfo(re, cluster, r3) @@ -146,7 +155,8 @@ func (suite *regionTestSuite) checkAccelerateRegionsScheduleInRanges(cluster *te body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", urlPrefix), []byte(body), tu.StatusOK(re)) + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", urlPrefix), []byte(body), + tu.StatusOK(re)) suite.NoError(err) idList := leader.GetRaftCluster().GetSuspectRegions() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {