Skip to content

Commit

Permalink
meshreg: refactoring source initialisation code to improve extensibility
Browse files Browse the repository at this point in the history
  • Loading branch information
believening authored and code committed Apr 25, 2024
1 parent 89e2996 commit 5b43d6b
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ var (
).Get()
return parseSeLabelKeys(seLabelSelectorKeys)
}()

NacosClientHeaders = env.RegisterStringVar(
"NACOS_CLIENT_HEADERS",
"",
"specify the additional headers to send to nacos server",
).Get()
)

func parseEndpointRelabelItems(endpointRelabelItems string) map[string]string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type HttpServer struct {
mux *http.ServeMux
xdsReady bool
sourceReady bool
sourceReadyMsg string
httpPathHandler common.PathHandler
sources cmap.ConcurrentMap[string, bool]
lock sync.Mutex
Expand Down

This file was deleted.

193 changes: 27 additions & 166 deletions staging/src/slime.io/slime/modules/meshregistry/pkg/server/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ import (
"time"

cmap "github.com/orcaman/concurrent-map/v2"
"google.golang.org/grpc"
"istio.io/libistio/pkg/config/event"
"istio.io/libistio/pkg/config/schema/collection"
"istio.io/libistio/pkg/config/schema/collections"
"istio.io/libistio/pkg/config/schema/resource"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -37,10 +33,7 @@ import (
"slime.io/slime/modules/meshregistry/pkg/mcpoverxds"
"slime.io/slime/modules/meshregistry/pkg/monitoring"
"slime.io/slime/modules/meshregistry/pkg/multicluster"
"slime.io/slime/modules/meshregistry/pkg/source/eureka"
"slime.io/slime/modules/meshregistry/pkg/source/k8s/fs"
"slime.io/slime/modules/meshregistry/pkg/source/nacos"
"slime.io/slime/modules/meshregistry/pkg/source/zookeeper"
"slime.io/slime/modules/meshregistry/pkg/source"
utilcache "slime.io/slime/modules/meshregistry/pkg/util/cache"
)

Expand All @@ -57,153 +50,68 @@ type Processing struct {
listener net.Listener
stopCh chan struct{}
httpServer *HttpServer

kubeSourceSchemas *collection.Schemas
}

// NewProcessing returns a new processing component.
func NewProcessing(args *Args) *Processing {
p := &Processing{
regArgs: args.RegistryArgs,
addOnRegArgs: args.AddOnRegArgs,
stopCh: make(chan struct{}),
localCLusterID: args.RegistryArgs.K8S.ClusterID,
}
hs := &HttpServer{
addr: args.RegistryArgs.HTTPServerAddr,
mux: http.NewServeMux(),
sourceReady: true,
sources: cmap.New[bool](),
httpPathHandler: args.SlimeEnv.HttpPathHandler,
}

if rm := args.SlimeEnv.ReadyManager; rm != nil {
rm.AddReadyChecker("ready", hs.ready)
}

hs.HandleFunc("/args", p.cacheRegArgs)
hs.HandleFunc("/ready", hs.handleReadyProbe)
hs.HandleFunc("/clients", hs.handleClientsInfo)
hs.HandleFunc("/pc", hs.pc)
hs.HandleFunc("/nc", hs.nc)

ret := &Processing{
regArgs: args.RegistryArgs,
addOnRegArgs: args.AddOnRegArgs,
stopCh: make(chan struct{}),
httpServer: hs,
localCLusterID: args.RegistryArgs.K8S.ClusterID,
if rm := args.SlimeEnv.ReadyManager; rm != nil {
rm.AddReadyChecker("ready", hs.ready)
}

return ret
p.httpServer = hs
return p
}

// Start implements process.Component
func (p *Processing) Start() (err error) {
csrc := make([]event.Source, 0, 5)
var (
kubeFsSrc event.Source
zookeeperSrc event.Source
eurekaSrc event.Source
nacosSrc event.Source

srcPreStartHooks []func()

httpHandle func(http.ResponseWriter, *http.Request)
simpleHttpHandle func(http.ResponseWriter, *http.Request)
)

if p.regArgs.K8SSource.Enabled {
if p.regArgs.K8SSource.EnableConfigFile && p.regArgs.K8SSource.ConfigPath != "" {
schemas := p.getKubeSourceSchemas()
log.Warnf("watching config files in %s", p.regArgs.K8SSource.ConfigPath)
log.Warnf("watching config schemas %v", schemas.Kinds())
kubeFsSrc, err = fs.New(p.regArgs.K8SSource.ConfigPath, schemas, p.regArgs.K8SSource.WatchConfigFiles)
if err != nil {
return
}
}
}

var srcPreStartHooks []func()
clusterCache := false

if srcArgs := p.regArgs.ZookeeperSource.SourceArgs; srcArgs.Enabled {
if zookeeperSrc, httpHandle, simpleHttpHandle, err = zookeeper.New(
p.regArgs.ZookeeperSource, time.Duration(p.regArgs.RegistryStartDelay),
p.httpServer.SourceReadyCallBack, zookeeper.WithDynamicConfigOption(func(onZookeeperArgs func(*bootstrap.ZookeeperSourceArgs)) {
if p.addOnRegArgs != nil {
p.addOnRegArgs(func(args *bootstrap.RegistryArgs) {
onZookeeperArgs(args.ZookeeperSource)
})
}
})); err != nil {
return
}
p.httpServer.HandleFunc(zookeeper.ZkPath, httpHandle)
p.httpServer.HandleFunc(zookeeper.ZkSimplePath, simpleHttpHandle)
if zkSrc, ok := zookeeperSrc.(*zookeeper.Source); ok {
p.httpServer.HandleFunc(zookeeper.DubboCallModelPath, zkSrc.HandleDubboCallModel)
p.httpServer.HandleFunc(zookeeper.SidecarDubboCallModelPath, zkSrc.HandleSidecarDubboCallModel)
}
p.httpServer.SourceRegistry(zookeeper.SourceName)
if srcArgs.WaitTime > 0 {
p.httpServer.SourceReadyLater(zookeeper.SourceName, time.Duration(srcArgs.WaitTime))
}
clusterCache = clusterCache || p.regArgs.ZookeeperSource.LabelPatch
}

if srcArgs := p.regArgs.EurekaSource; srcArgs.Enabled {
if eurekaSrc, httpHandle, err = eureka.New(p.regArgs.EurekaSource, time.Duration(p.regArgs.RegistryStartDelay), p.httpServer.SourceReadyCallBack); err != nil {
return
}
p.httpServer.HandleFunc(eureka.HttpPath, httpHandle)
p.httpServer.SourceRegistry(eureka.SourceName)
if srcArgs.WaitTime > 0 {
p.httpServer.SourceReadyLater(eureka.SourceName, time.Duration(srcArgs.WaitTime))
csrc := make([]event.Source, 0, len(source.RegistrySources()))
for registryID, initlizer := range source.RegistrySources() {
src, handlers, cacheCluster, skip, err := initlizer(p.regArgs, p.httpServer.SourceReadyCallBack, p.addOnRegArgs)
if err != nil {
log.Errorf("init registry source %s failed: %v", registryID, err)
return err
}
clusterCache = clusterCache || p.regArgs.EurekaSource.LabelPatch
}

if srcArgs := p.regArgs.NacosSource; srcArgs.Enabled {
if nacosSrc, httpHandle, err = nacos.New(
p.regArgs.NacosSource, time.Duration(p.regArgs.RegistryStartDelay),
p.httpServer.SourceReadyCallBack, nacos.WithDynamicConfigOption(func(onNacosArgs func(*bootstrap.NacosSourceArgs)) {
if p.addOnRegArgs != nil {
p.addOnRegArgs(func(args *bootstrap.RegistryArgs) {
onNacosArgs(args.NacosSource)
})
}
})); err != nil {
return
if skip {
continue
}
p.httpServer.HandleFunc(nacos.HttpPath, httpHandle)
p.httpServer.SourceRegistry(nacos.SourceName)
if srcArgs.WaitTime > 0 {
p.httpServer.SourceReadyLater(nacos.SourceName, time.Duration(srcArgs.WaitTime))
p.httpServer.SourceRegistry(registryID)
for path, handler := range handlers {
p.httpServer.HandleFunc(path, handler)
}
clusterCache = clusterCache || p.regArgs.EurekaSource.LabelPatch
clusterCache = clusterCache || cacheCluster
csrc = append(csrc, src)
}

if clusterCache {
srcPreStartHooks = append(srcPreStartHooks, p.initMulticluster())
}

p.httpServer.HandleFunc("/args", p.cacheRegArgs)
if p.addOnRegArgs != nil {
p.addOnRegArgs(func(args *bootstrap.RegistryArgs) {
p.regArgs = args
})
}

if kubeFsSrc != nil {
csrc = append(csrc, kubeFsSrc)
}
if zookeeperSrc != nil {
csrc = append(csrc, zookeeperSrc)
}
if eurekaSrc != nil {
csrc = append(csrc, eurekaSrc)
}
if nacosSrc != nil {
csrc = append(csrc, nacosSrc)
}

p.httpServer.start()
grpc.EnableTracing = p.regArgs.EnableGRPCTracing

// TODO start sources
mcpController, err := mcpoverxds.NewController(p.regArgs)
if err != nil {
Expand Down Expand Up @@ -268,61 +176,14 @@ func (p *Processing) getDeployKubeClient() (k kubernetes.Interface, err error) {

func (p *Processing) getKubeClient(config *rest.Config, masterUrl, kubeconfigPath string) (k kubernetes.Interface, err error) {
if config == nil {
config, err = clientcmd.BuildConfigFromFlags(p.regArgs.K8S.ApiServerUrl, p.regArgs.K8S.KubeConfig)
config, err = clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
if err != nil {
return nil, err
}
}
return kubernetes.NewForConfig(config)
}

func (p *Processing) getKubeSourceSchemas() collection.Schemas {
if p.kubeSourceSchemas == nil {
builder := collection.NewSchemasBuilder()

colMap := make(map[string]struct{})
for _, col := range p.regArgs.K8SSource.Collections {
colMap[col] = struct{}{}
}
for _, col := range p.regArgs.Snapshots {
colMap[col] = struct{}{}
}
excludeKindMap := make(map[string]struct{})
for _, k := range p.regArgs.K8SSource.ExcludedResourceKinds {
excludeKindMap[k] = struct{}{}
}
for _, col := range p.regArgs.ExcludedResourceKinds {
excludeKindMap[col] = struct{}{}
}
schemaMap := make(map[resource.Schema]struct{})
for col := range colMap {
var schemas collection.Schemas
switch col {
case bootstrap.CollectionsAll:
schemas = collections.All
case bootstrap.CollectionsIstio:
schemas = collections.PilotGatewayAPI()
case bootstrap.CollectionsLegacyDefault:
schemas = collections.LegacyDefault
case bootstrap.CollectionsLegacyLocal:
schemas = collections.LegacyLocalAnalysis
}
for _, s := range schemas.All() {
if _, ok := excludeKindMap[s.Kind()]; ok {
continue
}
schemaMap[s] = struct{}{}
}
}
for s := range schemaMap {
builder.MustAdd(s)
}
schemas := builder.Build()
p.kubeSourceSchemas = &schemas
}
return *p.kubeSourceSchemas
}

// Stop implements process.Component
func (p *Processing) Stop() {
if p.stopCh != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
package server

import (
"os"
"google.golang.org/grpc"

slimebootstrap "slime.io/slime/framework/bootstrap"
"slime.io/slime/modules/meshregistry/pkg/bootstrap"

frameworkmodel "slime.io/slime/framework/model"
"slime.io/slime/modules/meshregistry/model"
"slime.io/slime/modules/meshregistry/pkg/bootstrap"
)

var log = model.ModuleLog.WithField(frameworkmodel.LogFieldKeyPkg, "server")
Expand All @@ -29,11 +28,8 @@ type Args struct {
}

func NewServer(args *Args) (*Server, error) {
os.Setenv("istio-revision", args.RegistryArgs.Revision)
os.Setenv("rev-crds", args.RegistryArgs.RevCrds)

grpc.EnableTracing = args.RegistryArgs.EnableGRPCTracing
proc := NewProcessing(args)

return &Server{
p: proc,
}, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package server

import (
_ "slime.io/slime/modules/meshregistry/pkg/source/eureka"
_ "slime.io/slime/modules/meshregistry/pkg/source/k8s/fs"
_ "slime.io/slime/modules/meshregistry/pkg/source/nacos"
_ "slime.io/slime/modules/meshregistry/pkg/source/zookeeper"
)
Loading

0 comments on commit 5b43d6b

Please sign in to comment.