diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/features/features.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/features/features.go index c55c74ba..496ba70b 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/features/features.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/features/features.go @@ -67,6 +67,12 @@ var ( ).Get() return parseSeLabelKeys(seLabelSelectorKeys) }() + + NacosClientHeaders = env.RegisterStringVar( + "NACOS_CLIENT_HEADERS", + "", + "specify the additional headers to send to nacos server", + ).Get() ) func parseEndpointRelabelItems(endpointRelabelItems string) map[string]string { diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/httpServer.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/httpServer.go index 1678d509..e7360d28 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/httpServer.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/httpServer.go @@ -22,7 +22,6 @@ type HttpServer struct { mux *http.ServeMux xdsReady bool sourceReady bool - sourceReadyMsg string httpPathHandler common.PathHandler sources cmap.ConcurrentMap[string, bool] lock sync.Mutex diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/patchtable.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/patchtable.go deleted file mode 100644 index 1126eb24..00000000 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/patchtable.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright Istio Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -// import ( -// "istio.io/libistio/galley/pkg/config/mesh" -// "istio.io/libistio/galley/pkg/config/source/kube" -// "istio.io/libistio/galley/pkg/config/source/kube/fs" -// "istio.io/libistio/pkg/config/event" -// ) - -// The patch table for external dependencies for code in components. -var ( -// newInterfaces = kube.NewInterfacesFromConfigFile - -// meshcfgNewFS = func(path string) (event.Source, error) { return mesh.NewMeshConfigFS(path) } -// fsNew = fs.New -) diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/processing.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/processing.go index 780c2790..bf972860 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/processing.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/processing.go @@ -23,11 +23,7 @@ import ( "time" cmap "github.com/orcaman/concurrent-map/v2" - "google.golang.org/grpc" "istio.io/libistio/pkg/config/event" - "istio.io/libistio/pkg/config/schema/collection" - "istio.io/libistio/pkg/config/schema/collections" - "istio.io/libistio/pkg/config/schema/resource" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -37,10 +33,7 @@ import ( "slime.io/slime/modules/meshregistry/pkg/mcpoverxds" "slime.io/slime/modules/meshregistry/pkg/monitoring" "slime.io/slime/modules/meshregistry/pkg/multicluster" - "slime.io/slime/modules/meshregistry/pkg/source/eureka" - "slime.io/slime/modules/meshregistry/pkg/source/k8s/fs" - "slime.io/slime/modules/meshregistry/pkg/source/nacos" - "slime.io/slime/modules/meshregistry/pkg/source/zookeeper" + "slime.io/slime/modules/meshregistry/pkg/source" utilcache "slime.io/slime/modules/meshregistry/pkg/util/cache" ) @@ -57,12 +50,16 @@ type Processing struct { listener net.Listener stopCh chan struct{} httpServer *HttpServer - - kubeSourceSchemas *collection.Schemas } // NewProcessing returns a new processing component. func NewProcessing(args *Args) *Processing { + p := &Processing{ + regArgs: args.RegistryArgs, + addOnRegArgs: args.AddOnRegArgs, + stopCh: make(chan struct{}), + localCLusterID: args.RegistryArgs.K8S.ClusterID, + } hs := &HttpServer{ addr: args.RegistryArgs.HTTPServerAddr, mux: http.NewServeMux(), @@ -70,140 +67,51 @@ func NewProcessing(args *Args) *Processing { sources: cmap.New[bool](), httpPathHandler: args.SlimeEnv.HttpPathHandler, } - - if rm := args.SlimeEnv.ReadyManager; rm != nil { - rm.AddReadyChecker("ready", hs.ready) - } - + hs.HandleFunc("/args", p.cacheRegArgs) hs.HandleFunc("/ready", hs.handleReadyProbe) hs.HandleFunc("/clients", hs.handleClientsInfo) hs.HandleFunc("/pc", hs.pc) hs.HandleFunc("/nc", hs.nc) - - ret := &Processing{ - regArgs: args.RegistryArgs, - addOnRegArgs: args.AddOnRegArgs, - stopCh: make(chan struct{}), - httpServer: hs, - localCLusterID: args.RegistryArgs.K8S.ClusterID, + if rm := args.SlimeEnv.ReadyManager; rm != nil { + rm.AddReadyChecker("ready", hs.ready) } - - return ret + p.httpServer = hs + return p } // Start implements process.Component func (p *Processing) Start() (err error) { - csrc := make([]event.Source, 0, 5) - var ( - kubeFsSrc event.Source - zookeeperSrc event.Source - eurekaSrc event.Source - nacosSrc event.Source - - srcPreStartHooks []func() - - httpHandle func(http.ResponseWriter, *http.Request) - simpleHttpHandle func(http.ResponseWriter, *http.Request) - ) - - if p.regArgs.K8SSource.Enabled { - if p.regArgs.K8SSource.EnableConfigFile && p.regArgs.K8SSource.ConfigPath != "" { - schemas := p.getKubeSourceSchemas() - log.Warnf("watching config files in %s", p.regArgs.K8SSource.ConfigPath) - log.Warnf("watching config schemas %v", schemas.Kinds()) - kubeFsSrc, err = fs.New(p.regArgs.K8SSource.ConfigPath, schemas, p.regArgs.K8SSource.WatchConfigFiles) - if err != nil { - return - } - } - } - + var srcPreStartHooks []func() clusterCache := false - - if srcArgs := p.regArgs.ZookeeperSource.SourceArgs; srcArgs.Enabled { - if zookeeperSrc, httpHandle, simpleHttpHandle, err = zookeeper.New( - p.regArgs.ZookeeperSource, time.Duration(p.regArgs.RegistryStartDelay), - p.httpServer.SourceReadyCallBack, zookeeper.WithDynamicConfigOption(func(onZookeeperArgs func(*bootstrap.ZookeeperSourceArgs)) { - if p.addOnRegArgs != nil { - p.addOnRegArgs(func(args *bootstrap.RegistryArgs) { - onZookeeperArgs(args.ZookeeperSource) - }) - } - })); err != nil { - return - } - p.httpServer.HandleFunc(zookeeper.ZkPath, httpHandle) - p.httpServer.HandleFunc(zookeeper.ZkSimplePath, simpleHttpHandle) - if zkSrc, ok := zookeeperSrc.(*zookeeper.Source); ok { - p.httpServer.HandleFunc(zookeeper.DubboCallModelPath, zkSrc.HandleDubboCallModel) - p.httpServer.HandleFunc(zookeeper.SidecarDubboCallModelPath, zkSrc.HandleSidecarDubboCallModel) - } - p.httpServer.SourceRegistry(zookeeper.SourceName) - if srcArgs.WaitTime > 0 { - p.httpServer.SourceReadyLater(zookeeper.SourceName, time.Duration(srcArgs.WaitTime)) - } - clusterCache = clusterCache || p.regArgs.ZookeeperSource.LabelPatch - } - - if srcArgs := p.regArgs.EurekaSource; srcArgs.Enabled { - if eurekaSrc, httpHandle, err = eureka.New(p.regArgs.EurekaSource, time.Duration(p.regArgs.RegistryStartDelay), p.httpServer.SourceReadyCallBack); err != nil { - return - } - p.httpServer.HandleFunc(eureka.HttpPath, httpHandle) - p.httpServer.SourceRegistry(eureka.SourceName) - if srcArgs.WaitTime > 0 { - p.httpServer.SourceReadyLater(eureka.SourceName, time.Duration(srcArgs.WaitTime)) + csrc := make([]event.Source, 0, len(source.RegistrySources())) + for registryID, initlizer := range source.RegistrySources() { + src, handlers, cacheCluster, skip, err := initlizer(p.regArgs, p.httpServer.SourceReadyCallBack, p.addOnRegArgs) + if err != nil { + log.Errorf("init registry source %s failed: %v", registryID, err) + return err } - clusterCache = clusterCache || p.regArgs.EurekaSource.LabelPatch - } - - if srcArgs := p.regArgs.NacosSource; srcArgs.Enabled { - if nacosSrc, httpHandle, err = nacos.New( - p.regArgs.NacosSource, time.Duration(p.regArgs.RegistryStartDelay), - p.httpServer.SourceReadyCallBack, nacos.WithDynamicConfigOption(func(onNacosArgs func(*bootstrap.NacosSourceArgs)) { - if p.addOnRegArgs != nil { - p.addOnRegArgs(func(args *bootstrap.RegistryArgs) { - onNacosArgs(args.NacosSource) - }) - } - })); err != nil { - return + if skip { + continue } - p.httpServer.HandleFunc(nacos.HttpPath, httpHandle) - p.httpServer.SourceRegistry(nacos.SourceName) - if srcArgs.WaitTime > 0 { - p.httpServer.SourceReadyLater(nacos.SourceName, time.Duration(srcArgs.WaitTime)) + p.httpServer.SourceRegistry(registryID) + for path, handler := range handlers { + p.httpServer.HandleFunc(path, handler) } - clusterCache = clusterCache || p.regArgs.EurekaSource.LabelPatch + clusterCache = clusterCache || cacheCluster + csrc = append(csrc, src) } if clusterCache { srcPreStartHooks = append(srcPreStartHooks, p.initMulticluster()) } - p.httpServer.HandleFunc("/args", p.cacheRegArgs) if p.addOnRegArgs != nil { p.addOnRegArgs(func(args *bootstrap.RegistryArgs) { p.regArgs = args }) } - if kubeFsSrc != nil { - csrc = append(csrc, kubeFsSrc) - } - if zookeeperSrc != nil { - csrc = append(csrc, zookeeperSrc) - } - if eurekaSrc != nil { - csrc = append(csrc, eurekaSrc) - } - if nacosSrc != nil { - csrc = append(csrc, nacosSrc) - } - p.httpServer.start() - grpc.EnableTracing = p.regArgs.EnableGRPCTracing - // TODO start sources mcpController, err := mcpoverxds.NewController(p.regArgs) if err != nil { @@ -268,7 +176,7 @@ func (p *Processing) getDeployKubeClient() (k kubernetes.Interface, err error) { func (p *Processing) getKubeClient(config *rest.Config, masterUrl, kubeconfigPath string) (k kubernetes.Interface, err error) { if config == nil { - config, err = clientcmd.BuildConfigFromFlags(p.regArgs.K8S.ApiServerUrl, p.regArgs.K8S.KubeConfig) + config, err = clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath) if err != nil { return nil, err } @@ -276,53 +184,6 @@ func (p *Processing) getKubeClient(config *rest.Config, masterUrl, kubeconfigPat return kubernetes.NewForConfig(config) } -func (p *Processing) getKubeSourceSchemas() collection.Schemas { - if p.kubeSourceSchemas == nil { - builder := collection.NewSchemasBuilder() - - colMap := make(map[string]struct{}) - for _, col := range p.regArgs.K8SSource.Collections { - colMap[col] = struct{}{} - } - for _, col := range p.regArgs.Snapshots { - colMap[col] = struct{}{} - } - excludeKindMap := make(map[string]struct{}) - for _, k := range p.regArgs.K8SSource.ExcludedResourceKinds { - excludeKindMap[k] = struct{}{} - } - for _, col := range p.regArgs.ExcludedResourceKinds { - excludeKindMap[col] = struct{}{} - } - schemaMap := make(map[resource.Schema]struct{}) - for col := range colMap { - var schemas collection.Schemas - switch col { - case bootstrap.CollectionsAll: - schemas = collections.All - case bootstrap.CollectionsIstio: - schemas = collections.PilotGatewayAPI() - case bootstrap.CollectionsLegacyDefault: - schemas = collections.LegacyDefault - case bootstrap.CollectionsLegacyLocal: - schemas = collections.LegacyLocalAnalysis - } - for _, s := range schemas.All() { - if _, ok := excludeKindMap[s.Kind()]; ok { - continue - } - schemaMap[s] = struct{}{} - } - } - for s := range schemaMap { - builder.MustAdd(s) - } - schemas := builder.Build() - p.kubeSourceSchemas = &schemas - } - return *p.kubeSourceSchemas -} - // Stop implements process.Component func (p *Processing) Stop() { if p.stopCh != nil { diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/server.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/server.go index 34cfd033..a587b3d1 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/server.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/server.go @@ -6,13 +6,12 @@ package server import ( - "os" + "google.golang.org/grpc" slimebootstrap "slime.io/slime/framework/bootstrap" - "slime.io/slime/modules/meshregistry/pkg/bootstrap" - frameworkmodel "slime.io/slime/framework/model" "slime.io/slime/modules/meshregistry/model" + "slime.io/slime/modules/meshregistry/pkg/bootstrap" ) var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "server") @@ -29,11 +28,8 @@ type Args struct { } func NewServer(args *Args) (*Server, error) { - os.Setenv("istio-revision", args.RegistryArgs.Revision) - os.Setenv("rev-crds", args.RegistryArgs.RevCrds) - + grpc.EnableTracing = args.RegistryArgs.EnableGRPCTracing proc := NewProcessing(args) - return &Server{ p: proc, }, nil diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/server/sources.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/sources.go new file mode 100644 index 00000000..62e1f795 --- /dev/null +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/server/sources.go @@ -0,0 +1,8 @@ +package server + +import ( + _ "slime.io/slime/modules/meshregistry/pkg/source/eureka" + _ "slime.io/slime/modules/meshregistry/pkg/source/k8s/fs" + _ "slime.io/slime/modules/meshregistry/pkg/source/nacos" + _ "slime.io/slime/modules/meshregistry/pkg/source/zookeeper" +) diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/conversion.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/conversion.go index 0cd614aa..46dbc449 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/conversion.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/conversion.go @@ -12,18 +12,6 @@ import ( "slime.io/slime/modules/meshregistry/pkg/util" ) -const ( - ProjectCode = "projectCode" -) - -type Error struct { - msg string -} - -func (e Error) Error() string { - return e.msg -} - type convertOptions struct { patchLabel bool enableProjectCode bool @@ -53,7 +41,10 @@ func ConvertServiceEntryMap(apps []*application, opts *convertOptions) (map[stri var projectCodes []string if opts.enableProjectCode { - projectCodes = getProjectCodeArr(app) + projectCodes = source.GetProjectCodeArr(app, + func(a *application) []*instance { return a.Instances }, + func(i *instance) map[string]string { return i.Metadata }, + ) } else { projectCodes = append(projectCodes, "") } @@ -165,8 +156,8 @@ func convertEndpointsWithNs(instances []*instance, projectCode string, opts *con log.Errorf("instance port illegal %v", ins) continue } - // 与要求projectCode不同的服务实例跳过,实现eureka服务实例项目隔离 - if projectCode != "" && ins.Metadata[ProjectCode] != projectCode { + // if the project code is not empty, and the project code of the instance is not equal to the project code, skip it + if projectCode != "" && ins.Metadata[source.ProjectCode] != projectCode { continue } @@ -224,20 +215,3 @@ func convertEndpointsWithNs(instances []*instance, projectCode string, opts *con return endpointsMap, svcPortsMap, useDNSMap } - -// 获取每一个应用的projectCode -func getProjectCodeArr(app *application) []string { - projectCodes := make([]string, 0) - projectCodeMap := make(map[string]struct{}) - // 获取服务中所有实例的projectCode标签,并去重 - for _, instance := range app.Instances { - if v, ok := instance.Metadata[ProjectCode]; ok { - if _, ok = projectCodeMap[v]; !ok { - projectCodes = append(projectCodes, v) - projectCodeMap[v] = struct{}{} - } - } - } - - return projectCodes -} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/source.go index dcc9a610..b20ee7dc 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/eureka/source.go @@ -21,8 +21,18 @@ import ( "slime.io/slime/modules/meshregistry/pkg/util" ) +const ( + SourceName = "eureka" + + HttpPath = "/eureka" +) + var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "eureka") +func init() { + source.RegisterSourceInitlizer(SourceName, source.RegistrySourceInitlizer(New)) +} + type Source struct { args *bootstrap.EurekaSourceArgs @@ -43,12 +53,16 @@ type Source struct { client Client } -const ( - SourceName = "eureka" - HttpPath = "/eureka" -) +func New( + moduleArgs *bootstrap.RegistryArgs, + readyCallback func(string), + _ func(func(*bootstrap.RegistryArgs)), +) (event.Source, map[string]http.HandlerFunc, bool, bool, error) { + args := moduleArgs.EurekaSource + if !args.Enabled { + return nil, nil, false, true, nil + } -func New(args *bootstrap.EurekaSourceArgs, delay time.Duration, readyCallback func(string)) (event.Source, func(http.ResponseWriter, *http.Request), error) { var argsCopy *bootstrap.EurekaSourceArgs if args.GatewayModel || args.NsfEureka { cp := *args @@ -63,21 +77,18 @@ func New(args *bootstrap.EurekaSourceArgs, delay time.Duration, readyCallback fu argsCopy.AppSuffix = ".nsf" } } - if argsCopy != nil { args = argsCopy } - serviers := args.Servers - if len(serviers) == 0 { - serviers = []bootstrap.EurekaServer{args.EurekaServer} + if !args.InstancePortAsSvcPort && args.SvcPort == 0 { + return nil, nil, false, false, fmt.Errorf("SvcPort == 0 while InstancePortAsSvcPort false is not permitted") } - client := NewClients(serviers) var svcMocker *source.ServiceEntryMergePortMocker if args.MockServiceEntryName != "" { if args.MockServiceName == "" { - return nil, nil, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) + return nil, nil, false, false, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) } svcMocker = source.NewServiceEntryMergePortMocker( args.MockServiceEntryName, args.ResourceNs, args.MockServiceName, @@ -87,41 +98,40 @@ func New(args *bootstrap.EurekaSourceArgs, delay time.Duration, readyCallback fu }) } - if !args.InstancePortAsSvcPort && args.SvcPort == 0 { - return nil, nil, fmt.Errorf("SvcPort == 0 while InstancePortAsSvcPort false is not permitted") - } - - ret := &Source{ - args: args, - delay: delay, - started: false, - - initedCallback: readyCallback, - - cache: make(map[string]*networkingapi.ServiceEntry), - - stop: make(chan struct{}), - seInitCh: make(chan struct{}), - - client: client, + src := &Source{ + args: args, + delay: time.Duration(moduleArgs.RegistryStartDelay), + started: false, + initedCallback: readyCallback, + cache: make(map[string]*networkingapi.ServiceEntry), + stop: make(chan struct{}), + seInitCh: make(chan struct{}), seMergePortMocker: svcMocker, } - ret.initWg.Add(1) // service entry init-sync - if svcMocker != nil { - ret.handlers = append(ret.handlers, ret.seMergePortMocker) + serviers := args.Servers + if len(serviers) == 0 { + serviers = []bootstrap.EurekaServer{args.EurekaServer} + } + src.client = NewClients(serviers) - svcMocker.SetDispatcher(func(meta resource.Metadata, item *networkingapi.ServiceEntry) { + src.initWg.Add(1) // service entry init-sync + if src.seMergePortMocker != nil { + src.handlers = append(src.handlers, src.seMergePortMocker) + src.seMergePortMocker.SetDispatcher(func(meta resource.Metadata, item *networkingapi.ServiceEntry) { ev := source.BuildServiceEntryEvent(event.Updated, item, meta) - for _, h := range ret.handlers { + for _, h := range src.handlers { h.Handle(ev) } }) + src.initWg.Add(1) + } - ret.initWg.Add(1) + debugHandler := map[string]http.HandlerFunc{ + HttpPath: src.handleHttp, } - return ret, ret.handleHttp, nil + return src, debugHandler, args.LabelPatch, false, nil } func (s *Source) cacheShallowCopy() map[string]*networkingapi.ServiceEntry { @@ -314,6 +324,14 @@ func (s *Source) Start() { monitoring.RecordReady(SourceName, t0, time.Now()) s.initedCallback(SourceName) }() + + // If wait time is set, we will call the initedCallback after wait time. + if s.args.WaitTime > 0 { + go func() { + time.Sleep(time.Duration(s.args.WaitTime)) + s.initedCallback(SourceName) + }() + } } go func() { diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/fs/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/fs/source.go index 265d35f8..c85b1e34 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/fs/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/fs/source.go @@ -1,5 +1,63 @@ package fs -import "istio.io/libistio/pkg/config/source/kube/file" +import ( + "net/http" -var New = file.New + "istio.io/libistio/pkg/config/event" + "istio.io/libistio/pkg/config/source/kube/file" + + frameworkmodel "slime.io/slime/framework/model" + "slime.io/slime/modules/meshregistry/model" + "slime.io/slime/modules/meshregistry/pkg/bootstrap" + "slime.io/slime/modules/meshregistry/pkg/source" + "slime.io/slime/modules/meshregistry/pkg/source/k8s" +) + +const ( + SourceName = "kubefs" +) + +var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "zk") + +func init() { + source.RegisterSourceInitlizer(SourceName, source.RegistrySourceInitlizer(New)) +} + +type Source struct { + event.Source + + initedCallback func(string) +} + +func New( + moduleArgs *bootstrap.RegistryArgs, + readyCallback func(string), + _ func(func(*bootstrap.RegistryArgs)), +) (event.Source, map[string]http.HandlerFunc, bool, bool, error) { + args := moduleArgs.K8SSource + if !args.Enabled || !args.EnableConfigFile || args.ConfigPath == "" { + return nil, nil, false, true, nil + } + collections := append([]string(nil), args.Collections...) + collections = append(collections, moduleArgs.Snapshots...) + excludeKinds := append([]string(nil), args.ExcludedResourceKinds...) + excludeKinds = append(excludeKinds, moduleArgs.ExcludedResourceKinds...) + schemas := k8s.BuildKubeSourceSchemas(collections, excludeKinds) + kubeFsSrc, err := file.New(args.ConfigPath, schemas, args.WatchConfigFiles) + if err != nil { + log.Errorf("Failed to create kube fs source: %v", err) + return nil, nil, false, false, err + } + src := &Source{ + Source: kubeFsSrc, + initedCallback: readyCallback, + } + + return src, nil, false, false, nil +} + +func (s *Source) Start() { + // we can't control the concurrency of the kube fs source, so we need to start it in a goroutine, then mark it ready + go s.Source.Start() + s.initedCallback(SourceName) +} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/source.go index 959b2329..20bc3151 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/k8s/source.go @@ -1,5 +1,46 @@ package k8s -const ( - K8S = "K8S" +import ( + "istio.io/libistio/pkg/config/schema/collection" + "istio.io/libistio/pkg/config/schema/collections" + "istio.io/libistio/pkg/config/schema/resource" + + "slime.io/slime/modules/meshregistry/pkg/bootstrap" ) + +func BuildKubeSourceSchemas(cols, excludedKinds []string) collection.Schemas { + builder := collection.NewSchemasBuilder() + + colMap := make(map[string]struct{}) + for _, col := range cols { + colMap[col] = struct{}{} + } + excludeKindMap := make(map[string]struct{}) + for _, col := range excludedKinds { + excludeKindMap[col] = struct{}{} + } + schemaMap := make(map[resource.Schema]struct{}) + for col := range colMap { + var schemas collection.Schemas + switch col { + case bootstrap.CollectionsAll: + schemas = collections.All + case bootstrap.CollectionsIstio: + schemas = collections.PilotGatewayAPI() + case bootstrap.CollectionsLegacyDefault: + schemas = collections.LegacyDefault + case bootstrap.CollectionsLegacyLocal: + schemas = collections.LegacyLocalAnalysis + } + for _, s := range schemas.All() { + if _, ok := excludeKindMap[s.Kind()]; ok { + continue + } + schemaMap[s] = struct{}{} + } + } + for s := range schemaMap { + builder.MustAdd(s) + } + return builder.Build() +} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/conversion.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/conversion.go index bf120fd8..f05455f1 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/conversion.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/conversion.go @@ -11,18 +11,6 @@ import ( "slime.io/slime/modules/meshregistry/pkg/util" ) -const ( - ProjectCode = "projectCode" -) - -type Error struct { - msg string -} - -func (e Error) Error() string { - return e.msg -} - type convertOptions struct { patchLabel bool enableProjectCode bool @@ -55,7 +43,10 @@ func ConvertServiceEntryMap(instances []*instanceResp, opts *convertOptions) (ma var projectCodes []string if opts.enableProjectCode { - projectCodes = getProjectCodeArr(ins) + projectCodes = source.GetProjectCodeArr(ins, + func(ir *instanceResp) []*instance { return ir.Hosts }, + func(i *instance) map[string]string { return i.Metadata }, + ) } else { projectCodes = append(projectCodes, "") } @@ -170,8 +161,8 @@ func convertEndpointsWithNs(instances []*instance, projectCode string, opts *con if !ins.Healthy { // nacos-spec continue } - // 与要求projectCode不同的服务实例跳过,实现服务实例项目隔离 - if projectCode != "" && ins.Metadata[ProjectCode] != projectCode { + // if the project code is not empty, and the project code of the instance is not equal to the project code, skip it + if projectCode != "" && ins.Metadata[source.ProjectCode] != projectCode { continue } @@ -244,18 +235,3 @@ func convertInstanceId(labels map[string]string) { labels["instanceId"] = strings.ReplaceAll(v, ":", "_") } } - -func getProjectCodeArr(ins *instanceResp) []string { - projectCodes := make([]string, 0) - projectCodeMap := make(map[string]struct{}) - for _, instance := range ins.Hosts { - if v, ok := instance.Metadata[ProjectCode]; ok { - if _, ok := projectCodeMap[v]; !ok { - projectCodes = append(projectCodes, v) - projectCodeMap[v] = struct{}{} - } - } - } - - return projectCodes -} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go index c04ca380..68b6104f 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/nacos/source.go @@ -4,13 +4,11 @@ import ( "encoding/json" "fmt" "net/http" - "os" "reflect" "strings" "sync" "time" - cmap "github.com/orcaman/concurrent-map/v2" networkingapi "istio.io/api/networking/v1alpha3" "istio.io/libistio/pkg/config/event" "istio.io/libistio/pkg/config/resource" @@ -18,13 +16,26 @@ import ( frameworkmodel "slime.io/slime/framework/model" "slime.io/slime/modules/meshregistry/model" "slime.io/slime/modules/meshregistry/pkg/bootstrap" + "slime.io/slime/modules/meshregistry/pkg/features" "slime.io/slime/modules/meshregistry/pkg/monitoring" "slime.io/slime/modules/meshregistry/pkg/source" "slime.io/slime/modules/meshregistry/pkg/util" ) +const ( + SourceName = "nacos" + + HttpPath = "/nacos" + + defaultServiceFilter = "" +) + var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "nacos") +func init() { + source.RegisterSourceInitlizer(SourceName, source.RegistrySourceInitlizer(New)) +} + type Source struct { args *bootstrap.NacosSourceArgs // should only be accessed in `onConfig` @@ -36,9 +47,8 @@ type Source struct { delay time.Duration // source cache - cache map[string]*networkingapi.ServiceEntry - namingServiceList cmap.ConcurrentMap[string, bool] - handlers []event.Handler + cache map[string]*networkingapi.ServiceEntry + handlers []event.Handler mut sync.RWMutex @@ -64,26 +74,16 @@ type Source struct { seMetaModifierFactory func(string) func(*resource.Metadata) } -const ( - SourceName = "nacos" - HttpPath = "/nacos" - POLLING = "polling" - WATCHING = "watching" - clientHeadersEnv = "NACOS_CLIENT_HEADERS" - - defaultServiceFilter = "" -) - -type Option func(s *Source) error - -func WithDynamicConfigOption(addCb func(func(*bootstrap.NacosSourceArgs))) Option { - return func(s *Source) error { - addCb(s.onConfig) - return nil +func New( + moduleArgs *bootstrap.RegistryArgs, + readyCallback func(string), + addOnReArgs func(onReArgsCallback func(args *bootstrap.RegistryArgs)), +) (event.Source, map[string]http.HandlerFunc, bool, bool, error) { + args := moduleArgs.NacosSource + if !args.Enabled { + return nil, nil, false, true, nil } -} -func New(args *bootstrap.NacosSourceArgs, delay time.Duration, readyCallback func(string), options ...Option) (event.Source, func(http.ResponseWriter, *http.Request), error) { var argsCopy *bootstrap.NacosSourceArgs if args.GatewayModel || args.NsfNacos { cp := *args @@ -98,15 +98,22 @@ func New(args *bootstrap.NacosSourceArgs, delay time.Duration, readyCallback fun argsCopy.DomSuffix = ".nsf" } } - if argsCopy != nil { args = argsCopy } + if !args.InstancePortAsSvcPort && args.SvcPort == 0 { + return nil, nil, false, false, fmt.Errorf("SvcPort == 0 while InstancePortAsSvcPort false is not permitted") + } + + if args.Mode != source.ModePolling { + log.Warningf("nacos source only support polling mode, but got %s, will use polling mode", args.Mode) + } + var svcMocker *source.ServiceEntryMergePortMocker if args.MockServiceEntryName != "" { if args.MockServiceName == "" { - return nil, nil, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) + return nil, nil, false, false, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) } svcMocker = source.NewServiceEntryMergePortMocker( args.MockServiceEntryName, args.ResourceNs, args.MockServiceName, @@ -116,21 +123,23 @@ func New(args *bootstrap.NacosSourceArgs, delay time.Duration, readyCallback fun }) } - ret := &Source{ + src := &Source{ args: args, - delay: delay, + delay: time.Duration(moduleArgs.RegistryStartDelay), started: false, initedCallback: readyCallback, cache: make(map[string]*networkingapi.ServiceEntry), - namingServiceList: cmap.New[bool](), stop: make(chan struct{}), seInitCh: make(chan struct{}), seMergePortMocker: svcMocker, } + servers := args.Servers + if len(servers) == 0 { + servers = []bootstrap.NacosServer{args.NacosServer} + } headers := make(map[string]string) - nacosHeaders := os.Getenv(clientHeadersEnv) - if nacosHeaders != "" { + if nacosHeaders := features.NacosClientHeaders; nacosHeaders != "" { for _, header := range strings.Split(nacosHeaders, ",") { items := strings.SplitN(header, "=", 2) if len(items) == 2 { @@ -138,42 +147,36 @@ func New(args *bootstrap.NacosSourceArgs, delay time.Duration, readyCallback fun } } } + src.client = NewClients(servers, args.MetaKeyNamespace, args.MetaKeyGroup, headers) - if args.Mode != POLLING { - log.Warningf("nacos source only support polling mode, but got %s, will use polling mode", args.Mode) - } - servers := args.Servers - if len(servers) == 0 { - servers = []bootstrap.NacosServer{args.NacosServer} - } - ret.client = NewClients(servers, args.MetaKeyNamespace, args.MetaKeyGroup, headers) - - ret.initWg.Add(1) - if ret.seMergePortMocker != nil { - ret.handlers = append(ret.handlers, ret.seMergePortMocker) - - svcMocker.SetDispatcher(func(meta resource.Metadata, item *networkingapi.ServiceEntry) { + src.initWg.Add(1) // // service entry init-sync + if src.seMergePortMocker != nil { + src.handlers = append(src.handlers, src.seMergePortMocker) + src.seMergePortMocker.SetDispatcher(func(meta resource.Metadata, item *networkingapi.ServiceEntry) { ev := source.BuildServiceEntryEvent(event.Updated, item, meta) - for _, h := range ret.handlers { + for _, h := range src.handlers { h.Handle(ev) } }) - - ret.initWg.Add(1) + src.initWg.Add(1) } - ret.instanceFilter = generateInstanceFilter(args.ServicedEndpointSelectors, args.EndpointSelectors, !args.EmptyEpSelectorsExcludeAll, args.AlwaysUseSourceScopedEpSelectors) - ret.serviceHostAliases = generateServiceHostAliases(args.ServiceHostAliases) - ret.seMetaModifierFactory = generateSeMetaModifierFactory(args.ServiceAdditionalMetas) - ret.reGroupInstances = reGroupInstances(args.InstanceMetaRelabel, args.ServiceNaming) + src.instanceFilter = generateInstanceFilter(args.ServicedEndpointSelectors, args.EndpointSelectors, !args.EmptyEpSelectorsExcludeAll, args.AlwaysUseSourceScopedEpSelectors) + src.serviceHostAliases = generateServiceHostAliases(args.ServiceHostAliases) + src.seMetaModifierFactory = generateSeMetaModifierFactory(args.ServiceAdditionalMetas) + src.reGroupInstances = reGroupInstances(args.InstanceMetaRelabel, args.ServiceNaming) - for _, op := range options { - if err := op(ret); err != nil { - return nil, nil, err - } + if addOnReArgs != nil { + addOnReArgs(func(reArgs *bootstrap.RegistryArgs) { + src.onConfig(reArgs.NacosSource) + }) + } + + debugHandler := map[string]http.HandlerFunc{ + HttpPath: src.handleHttp, } - return ret, ret.handleHttp, nil + return src, debugHandler, args.LabelPatch, false, nil } func (s *Source) cacheShallowCopy() map[string]*networkingapi.ServiceEntry { @@ -327,7 +330,7 @@ func (s *Source) handleHttp(w http.ResponseWriter, req *http.Request) { s.cacheJson(w, req) } -func (s *Source) dumpClients(w http.ResponseWriter, r *http.Request) { +func (s *Source) dumpClients(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte(s.client.RegistryInfo())) } @@ -378,10 +381,18 @@ func (s *Source) Start() { monitoring.RecordReady(SourceName, t0, time.Now()) s.initedCallback(SourceName) }() + + // If wait time is set, we will call the initedCallback after wait time. + if s.args.WaitTime > 0 { + go func() { + time.Sleep(time.Duration(s.args.WaitTime)) + s.initedCallback(SourceName) + }() + } } go func() { - if s.args.Mode == POLLING { + if s.args.Mode == source.ModePolling { go s.Polling() } else { log.Warningf("nacos source only support polling mode, but got %s, will use polling mode", s.args.Mode) diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/source.go index 5683aed6..4facc563 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/source.go @@ -1,40 +1,72 @@ package source import ( + "net/http" "strings" "time" + resource2 "istio.io/istio-mcp/pkg/config/schema/resource" + "istio.io/libistio/pkg/config/event" + "istio.io/libistio/pkg/config/resource" + "istio.io/libistio/pkg/config/schema/collections" + frameworkmodel "slime.io/slime/framework/model" "slime.io/slime/modules/meshregistry/model" + "slime.io/slime/modules/meshregistry/pkg/bootstrap" "slime.io/slime/modules/meshregistry/pkg/features" +) - resource2 "istio.io/istio-mcp/pkg/config/schema/resource" - "istio.io/libistio/pkg/config/resource" - "istio.io/libistio/pkg/config/schema/collection" - "istio.io/libistio/pkg/config/schema/collections" +const ( + ModePolling = "polling" + ModeWatching = "watching" + + ProjectCode = "projectCode" + + CacheRegistryInfoQueryKey = "registries" ) -const CacheRegistryInfoQueryKey = "registries" +var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "source") type RegistryInfo struct { RegistryID string `json:"registry_id,omitempty"` Addresses []string `json:"addresses,omitempty"` } -var ( - kindServiceEntry = collections.ServiceEntry.Kind() - kindSidecar = collections.Sidecar.Kind() - IstioRevisionKey = "istio.io/rev" +// RegistrySourceInitlizer is the initlizer of registry source, it will be called when the registry source is needed created. +type RegistrySourceInitlizer func( + // args is the args of the meshregistry, it is used to initialize the registry source. + args *bootstrap.RegistryArgs, + // sourceReadyCallback is the callback function that will be called by the registry source when it is ready. + sourceReadyCallback func(string), + // addOnReArgs is used to register a callback function that will be called when the args of the meshregistry is changed. + addOnReArgs func(onReArgsCallback func(args *bootstrap.RegistryArgs)), +) ( + // source is the event.Source implementation of the registry source. + source event.Source, + // debugHandler is the map of http.HandlerFunc that will be used to handle the debug requests. + debugHandler map[string]http.HandlerFunc, + // cacheCluster is the flag that indicates whether the meshregistry should cache the cluster info. + cacheCluster bool, + // skip is the flag that indicates whether the meshregistry should skip the initialization process for the registry source. + skip bool, + // err is the error that may occur during the initialization process. + err error, +) - log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "source") +var registrySources map[string]RegistrySourceInitlizer - ServiceEntry = collection.Builder{ - Resource: collections.ServiceEntry, - }.MustBuild() - Sidecar = collection.Builder{ - Resource: collections.Sidecar, - }.MustBuild() -) +func RegistrySources() map[string]RegistrySourceInitlizer { + return registrySources +} + +// RegisterSourceInitlizer registers the initlizer of registry source. +// It is not concurrent safe, and recommend to be called in init function. +func RegisterSourceInitlizer(sourceName string, initlizer RegistrySourceInitlizer) { + if registrySources == nil { + registrySources = make(map[string]RegistrySourceInitlizer) + } + registrySources[sourceName] = initlizer +} func GenVersion() resource.Version { return resource.Version(time.Now().String()) @@ -45,15 +77,30 @@ func IsInternalVersion(ver resource.Version) bool { } func IsInternalResource(gvk resource2.GroupVersionKind) bool { - return gvk.Kind == kindServiceEntry || gvk.Kind == kindSidecar + return gvk.Kind == collections.ServiceEntry.Kind() || gvk.Kind == collections.Sidecar.Kind() } func FillRevision(meta *resource.Metadata) bool { if features.IstioRevision != "" { - exist, ok := meta.Labels[IstioRevisionKey] - meta.Labels[IstioRevisionKey] = features.IstioRevision + exist, ok := meta.Labels[frameworkmodel.IstioRevLabel] + meta.Labels[frameworkmodel.IstioRevLabel] = features.IstioRevision return !ok || exist != features.IstioRevision } return false } + +func GetProjectCodeArr[T, I any](t T, instances func(T) []I, meta func(I) map[string]string) []string { + projectCodes := make([]string, 0) + projectCodeMap := make(map[string]struct{}) + for _, instance := range instances(t) { + if v, ok := meta(instance)[ProjectCode]; ok { + if _, ok := projectCodeMap[v]; !ok { + projectCodes = append(projectCodes, v) + projectCodeMap[v] = struct{}{} + } + } + } + + return projectCodes +} diff --git a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/zookeeper/source.go b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/zookeeper/source.go index 0c1bebdc..125a8618 100644 --- a/staging/src/slime.io/slime/modules/meshregistry/pkg/source/zookeeper/source.go +++ b/staging/src/slime.io/slime/modules/meshregistry/pkg/source/zookeeper/source.go @@ -35,31 +35,29 @@ import ( "slime.io/slime/modules/meshregistry/pkg/util" ) -var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "zk") - const ( - SourceName = "zookeeper" + SourceName = "zookeeper" + ZkPath = "/zk" ZkSimplePath = "/zks" DubboCallModelPath = "/dubboCallModel" SidecarDubboCallModelPath = "/sidecarDubboCallModel" - ConsumerNode = "consumers" - ProviderNode = "providers" - ConfiguratorNode = "configurators" - Polling = "polling" - - AttachmentDubboCallModel = "ATTACHMENT_DUBBO_CALL_MODEL" - - defaultServiceFilter = "" -) -const ( + ConsumerNode = "consumers" + ProviderNode = "providers" + ConfiguratorNode = "configurators" providerPathSuffix = "/" + ProviderNode consumerPathSuffix = "/" + ConsumerNode disableConsumerPath = "-" configuratorSuffix = "/" + ConfiguratorNode + + AttachmentDubboCallModel = "ATTACHMENT_DUBBO_CALL_MODEL" + + defaultServiceFilter = "" ) +var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "zk") + type zkConn struct { conn atomic.Value // store *zk.Conn } @@ -84,6 +82,10 @@ func (z *zkConn) ChildrenW(path string) ([]string, <-chan zk.Event, error) { return children, c, err } +func init() { + source.RegisterSourceInitlizer(SourceName, source.RegistrySourceInitlizer(New)) +} + type Source struct { args *bootstrap.ZookeeperSourceArgs @@ -125,16 +127,16 @@ type Source struct { forceUpdateTrigger *atomic.Value // store chan struct{} } -type Option func(s *Source) error - -func WithDynamicConfigOption(addCb func(func(*bootstrap.ZookeeperSourceArgs))) Option { - return func(s *Source) error { - addCb(s.onConfig) - return nil +func New( + moduleArgs *bootstrap.RegistryArgs, + readyCallback func(string), + addOnReArgs func(onReArgsCallback func(args *bootstrap.RegistryArgs)), +) (event.Source, map[string]http.HandlerFunc, bool, bool, error) { + args := moduleArgs.ZookeeperSource + if !args.Enabled { + return nil, nil, false, true, nil } -} -func New(args *bootstrap.ZookeeperSourceArgs, delay time.Duration, readyCallback func(string), options ...Option) (event.Source, func(http.ResponseWriter, *http.Request), func(http.ResponseWriter, *http.Request), error) { // XXX refactor to config if args.GatewayModel { args.SvcPort = 80 @@ -158,7 +160,7 @@ func New(args *bootstrap.ZookeeperSourceArgs, delay time.Duration, readyCallback var svcMocker *source.ServiceEntryMergePortMocker if args.MockServiceEntryName != "" { if args.MockServiceName == "" { - return nil, nil, nil, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) + return nil, nil, false, false, fmt.Errorf("args MockServiceName empty but MockServiceEntryName %s", args.MockServiceEntryName) } svcMocker = source.NewServiceEntryMergePortMocker( args.MockServiceEntryName, args.ResourceNs, args.MockServiceName, @@ -169,55 +171,58 @@ func New(args *bootstrap.ZookeeperSourceArgs, delay time.Duration, readyCallback }) } - ret := &Source{ - args: args, - ignoreLabelsMap: ignoreLabels, - - initedCallback: readyCallback, - - serviceMethods: map[string]string{}, - registryServiceCache: cmap.New[cmap.ConcurrentMap[string, []dubboInstance]](), - cache: cmap.New[cmap.ConcurrentMap[string, *ServiceEntryWithMeta]](), - seDubboCallModels: map[resource.FullName]map[string]DubboCallModel{}, - appSidecarUpdateTime: map[string]time.Time{}, - dubboPortsCache: map[uint32]*networkingapi.Port{}, - + src := &Source{ + args: args, + ignoreLabelsMap: ignoreLabels, + initedCallback: readyCallback, + serviceMethods: map[string]string{}, + registryServiceCache: cmap.New[cmap.ConcurrentMap[string, []dubboInstance]](), + cache: cmap.New[cmap.ConcurrentMap[string, *ServiceEntryWithMeta]](), + seDubboCallModels: map[resource.FullName]map[string]DubboCallModel{}, + appSidecarUpdateTime: map[string]time.Time{}, + dubboPortsCache: map[uint32]*networkingapi.Port{}, seInitCh: make(chan struct{}), stop: make(chan struct{}), watchingRoot: false, refreshSidecarNotifyCh: make(chan struct{}, 1), - - Con: &zkConn{}, - seMergePortMocker: svcMocker, - forceUpdateTrigger: &atomic.Value{}, + Con: &zkConn{}, + seMergePortMocker: svcMocker, + forceUpdateTrigger: &atomic.Value{}, } - ret.forceUpdateTrigger.Store(make(chan struct{})) + src.forceUpdateTrigger.Store(make(chan struct{})) - ret.handlers = append( - ret.handlers, - event.HandlerFromFn(ret.serviceEntryHandlerRefreshSidecar), + src.handlers = append( + src.handlers, + event.HandlerFromFn(src.serviceEntryHandlerRefreshSidecar), ) - ret.initWg.Add(1) // ServiceEntry init-sync ready + src.initWg.Add(1) // ServiceEntry init-sync ready if args.EnableDubboSidecar { - ret.initWg.Add(1) // Sidecar init-sync ready + src.initWg.Add(1) // Sidecar init-sync ready } - if ret.seMergePortMocker != nil { - ret.handlers = append(ret.handlers, ret.seMergePortMocker) - svcMocker.SetDispatcher(ret.dispatchMergePortsServiceEntry) - ret.initWg.Add(1) // merge ports se init-sync ready + if src.seMergePortMocker != nil { + src.handlers = append(src.handlers, src.seMergePortMocker) + svcMocker.SetDispatcher(src.dispatchMergePortsServiceEntry) + src.initWg.Add(1) // merge ports se init-sync ready } - ret.instanceFilter = generateInstanceFilter(args.ServicedEndpointSelectors, args.EndpointSelectors, !args.EmptyEpSelectorsExcludeAll, args.AlwaysUseSourceScopedEpSelectors) - ret.methodLBChecker = generateMethodLBChecker(args.MethodLBServiceSelectors) + src.instanceFilter = generateInstanceFilter(args.ServicedEndpointSelectors, args.EndpointSelectors, !args.EmptyEpSelectorsExcludeAll, args.AlwaysUseSourceScopedEpSelectors) + src.methodLBChecker = generateMethodLBChecker(args.MethodLBServiceSelectors) - for _, op := range options { - if err := op(ret); err != nil { - return nil, nil, nil, err - } + if addOnReArgs != nil { + addOnReArgs(func(reArgs *bootstrap.RegistryArgs) { + src.onConfig(reArgs.ZookeeperSource) + }) } - return ret, ret.cacheJson, ret.simpleCacheJson, nil + debugHandler := map[string]http.HandlerFunc{ + ZkPath: src.cacheJson, + ZkSimplePath: src.simpleCacheJson, + DubboCallModelPath: src.HandleDubboCallModel, + SidecarDubboCallModelPath: src.HandleSidecarDubboCallModel, + } + + return src, debugHandler, args.LabelPatch, false, nil } func (s *Source) dispatchMergePortsServiceEntry(meta resource.Metadata, se *networkingapi.ServiceEntry) { @@ -418,7 +423,7 @@ func (s *Source) cacheJson(w http.ResponseWriter, req *http.Request) { } func (s *Source) isPollingMode() bool { - return s.args.Mode == Polling + return s.args.Mode == source.ModePolling } func (s *Source) Start() { @@ -429,6 +434,14 @@ func (s *Source) Start() { monitoring.RecordReady(SourceName, t0, time.Now()) s.initedCallback(SourceName) }() + + // If wait time is set, we will call the initedCallback after wait time. + if s.args.WaitTime > 0 { + go func() { + time.Sleep(time.Duration(s.args.WaitTime)) + s.initedCallback(SourceName) + }() + } } go func() { // do recon @@ -445,9 +458,6 @@ func (s *Source) Start() { s.reConFunc(reconCh) starter.Do(func() { log.Infof("zk connected, will start fetch-data logic") - // s.initPush() - // TODO 服务自省模式 - // go s.doWatchAppication(s.ApplicationRegisterRootNode) if s.isPollingMode() { go s.Polling() } else {