Skip to content

Commit

Permalink
unify the usage of independent service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 9, 2024
1 parent 7741924 commit 3c5ee3a
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
for _, rule := range h.microserviceRedirectRules {
// Now we only support checking the scheduling service whether it is independent
if rule.targetServiceName == mcsutils.SchedulingServiceName {
if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if !h.s.GetRaftCluster().IsServiceIndependent(mcsutils.SchedulingServiceName) {
continue
}
}
Expand Down
36 changes: 20 additions & 16 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
return
}
rc.RemoveRegionIfExist(regionID)
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "The region is removed from server cache."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
if err != nil {
msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 68 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L68

Added line #L68 was not covered by tests
}
}
msg := "The region is removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
h.rd.JSON(w, http.StatusOK, msg)
}

// @Tags admin
Expand Down Expand Up @@ -101,11 +104,15 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
}
// Remove region from cache.
rc.RemoveRegionIfExist(regionID)
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "The region is removed from server cache and region meta storage."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 108 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L107-L108

Added lines #L107 - L108 were not covered by tests
err = h.DeleteRegionCacheInSchedulingServer(regionID)
if err != nil {
msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 111 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}
}
msg := "The region is removed from server cache and region meta storage."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))

h.rd.JSON(w, http.StatusOK, msg)

Check warning on line 115 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L115

Added line #L115 was not covered by tests
}

// @Tags admin
Expand All @@ -117,11 +124,15 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque
var err error
rc := getCluster(r)
rc.ResetRegionCache()
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "All regions are removed from server cache."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.DeleteRegionCacheInSchedulingServer()
if err != nil {
msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 131 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L131

Added line #L131 was not covered by tests
}
}
msg := "All regions are removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))

h.rd.JSON(w, http.StatusOK, msg)
}

// Intentionally no swagger mark as it is supposed to be only used in
Expand Down Expand Up @@ -239,10 +250,3 @@ func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error {
}
return nil
}

func (h *adminHandler) buildMsg(msg string, err error) string {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && err != nil {
return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
}
return msg
}
6 changes: 3 additions & 3 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler {
// @Router /config [get]
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
schedulingServerConfig, err := h.GetSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func getConfigMap(cfg map[string]any, key []string, value any) map[string]any {
// @Success 200 {object} sc.ScheduleConfig
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -410,7 +410,7 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request)
// @Success 200 {object} sc.ReplicationConfig
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
Expand Down
28 changes: 22 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,18 @@ func (c *RaftCluster) checkServices() {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
c.UnsetServiceIndependent(mcsutils.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
c.SetServiceIndependent(mcsutils.SchedulingServiceName)
}
}
} else {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
c.UnsetServiceIndependent(mcsutils.SchedulingServiceName)
}
}

Expand Down Expand Up @@ -2439,9 +2439,25 @@ func IsClientURL(addr string, etcdClient *clientv3.Client) bool {

// IsServiceIndependent returns whether the service is independent.
func (c *RaftCluster) IsServiceIndependent(name string) bool {
independent, exist := c.independentServices.Load(name)
if !exist {
if c == nil {
return false
}
return independent.(bool)
_, exist := c.independentServices.Load(name)
return exist
}

// SetServiceIndependent sets the service to be independent.
func (c *RaftCluster) SetServiceIndependent(name string) {
if c == nil {
return

Check warning on line 2452 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2452

Added line #L2452 was not covered by tests
}
c.independentServices.Store(name, struct{}{})
}

// UnsetServiceIndependent unsets the service to be independent.
func (c *RaftCluster) UnsetServiceIndependent(name string) {
if c == nil {
return

Check warning on line 2460 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2460

Added line #L2460 was not covered by tests
}
c.independentServices.Delete(name)
}
55 changes: 30 additions & 25 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear

s.handleDamagedStore(request.GetStats())
storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
if s.IsServiceIndependent(utils.SchedulingServiceName) {
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, _ := s.updateSchedulingClient(ctx)
cli := forwardCli.getClient()
if cli != nil {
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()

if s.IsServiceIndependent(utils.SchedulingServiceName) {
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
if forwardErrCh != nil {
select {
case err, ok := <-forwardErrCh:
Expand Down Expand Up @@ -1786,7 +1786,13 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 1792 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1790-L1792

Added lines #L1790 - L1792 were not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 1795 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1795

Added line #L1795 was not covered by tests
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Expand Down Expand Up @@ -1821,11 +1827,6 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp
return rsp.(*pdpb.AskBatchSplitResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if !versioninfo.IsFeatureSupported(rc.GetOpts().GetClusterVersion(), versioninfo.BatchSplit) {
return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil
}
Expand Down Expand Up @@ -2015,7 +2016,13 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2022 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2022

Added line #L2022 was not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.ScatterRegionResponse{
Expand Down Expand Up @@ -2067,11 +2074,6 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
return rsp.(*pdpb.ScatterRegionResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil
}

if len(request.GetRegionsId()) > 0 {
percentage, err := scatterRegions(rc, request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
Expand Down Expand Up @@ -2292,7 +2294,13 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2300 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2300

Added line #L2300 was not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.GetOperatorResponse{
Expand Down Expand Up @@ -2327,11 +2335,6 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
return rsp.(*pdpb.GetOperatorResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := rc.GetOperatorController()
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
Expand Down Expand Up @@ -2611,7 +2614,13 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2620 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2618-L2620

Added lines #L2618 - L2620 were not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 2623 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2623

Added line #L2623 was not covered by tests
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.SplitRegionsResponse{
Expand Down Expand Up @@ -2648,10 +2657,6 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion
return rsp.(*pdpb.SplitRegionsResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
finishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
return &pdpb.SplitRegionsResponse{
Header: s.header(),
Expand Down
9 changes: 0 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,15 +1465,6 @@ func (s *Server) GetRegions() []*core.RegionInfo {
return nil
}

// IsServiceIndependent returns if the service is enabled
func (s *Server) IsServiceIndependent(name string) bool {
rc := s.GetRaftCluster()
if rc != nil {
return rc.IsServiceIndependent(name)
}
return false
}

// GetServiceLabels returns ApiAccessPaths by given service label
// TODO: this function will be used for updating api rate limit config
func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath {
Expand Down

0 comments on commit 3c5ee3a

Please sign in to comment.