Skip to content

Commit

Permalink
Add filter on namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Dec 4, 2023
1 parent 80da387 commit cd8201e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 13 deletions.
68 changes: 60 additions & 8 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,65 @@ import (
"github.com/icinga/icinga-kubernetes/pkg/sync"
"github.com/okzk/sdnotify"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
kclientcmd "k8s.io/client-go/tools/clientcmd"
"reflect"
"strconv"
)

func main() {
kconfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(
kclientcmd.NewDefaultClientConfigLoadingRules(), &kclientcmd.ConfigOverrides{}).ClientConfig()
flags := internal.Flags{}

fv := reflect.ValueOf(&flags).Elem()
ft := reflect.TypeOf(flags)

for i := 0; i < ft.NumField(); i++ {
fieldType := ft.Field(i)
fieldValue := fv.Field(i)

long := fieldType.Tag.Get("long")
short := fieldType.Tag.Get("short")
value := fieldType.Tag.Get("default")
usage := fieldType.Tag.Get("description")

switch fieldValue.Kind() {
case reflect.String:
ref := fv.Field(i).Addr().Interface().(*string)
pflag.StringVarP(ref, long, short, value, usage)
case reflect.Int:
ref := fv.Field(i).Addr().Interface().(*int)

value, err := strconv.Atoi(value)
if err != nil {
logging.Fatal(errors.Wrap(err, "can't convert flag default value to integer"))
}

pflag.IntVarP(ref, long, short, value, usage)
case reflect.Bool:
ref := fv.Field(i).Addr().Interface().(*bool)

value, err := strconv.ParseBool(value)
if err != nil {
logging.Fatal(errors.Wrap(err, "can't convert flag default value to bool"))
}

pflag.BoolVarP(ref, long, short, value, usage)
}
}

kconfigOverrides := &kclientcmd.ConfigOverrides{}
kclientcmd.BindOverrideFlags(kconfigOverrides, pflag.CommandLine, kclientcmd.RecommendedConfigOverrideFlags(""))

kclientconfig := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(
kclientcmd.NewDefaultClientConfigLoadingRules(), kconfigOverrides)

pflag.Parse()

kconfig, err := kclientconfig.ClientConfig()
if err != nil {
logging.Fatal(errors.Wrap(err, "can't configure Kubernetes client"))
}
Expand All @@ -29,9 +79,11 @@ func main() {
logging.Fatal(errors.Wrap(err, "can't create Kubernetes client"))
}

flags, err := config.ParseFlags[internal.Flags]()
namespace, overridden, err := kclientconfig.Namespace()
if err != nil {
logging.Fatal(errors.Wrap(err, "can't parse flags"))
logging.Fatal(errors.Wrap(err, "can't get namespace from CLI"))
} else if !overridden {
namespace = kmetav1.NamespaceAll
}

cfg, err := config.FromYAMLFile[internal.Config](flags.Config)
Expand Down Expand Up @@ -66,26 +118,26 @@ func main() {
}
}

informers := kinformers.NewSharedInformerFactory(k, 0)
informers := kinformers.NewSharedInformerFactoryWithOptions(k, 0, kinformers.WithNamespace(namespace))

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return sync.NewSync(
db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"),
).Run(ctx)
).Run(ctx, namespace)
})

g.Go(func() error {
return sync.NewSync(
db, schema.NewNamespace, informers.Core().V1().Namespaces().Informer(), logs.GetChildLogger("Namespaces"),
).Run(ctx)
).Run(ctx, namespace)
})

g.Go(func() error {
return sync.NewSync(
db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"),
).Run(ctx)
).Run(ctx, namespace)
})

if err := g.Wait(); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/schema/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@ func NewPod() contracts.Resource {
func (p *Pod) Obtain(kobject kmetav1.Object) {
p.kmetaWithNamespace.Obtain(kobject)
}

func (p *Pod) Scope() any {
return &struct {
Namespace string
}{}
}
10 changes: 5 additions & 5 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type Sync interface {
Run(context.Context) error
Run(context.Context, string) error
}

type sync struct {
Expand All @@ -39,12 +39,12 @@ func NewSync(
}
}

func (s *sync) Run(ctx context.Context) error {
func (s *sync) Run(ctx context.Context, namespace string) error {
s.logger.Info("Starting sync")

s.logger.Debug("Warming up")

err := s.Warmup(ctx)
err := s.Warmup(ctx, namespace)
if err != nil {
return errors.Wrap(err, "warmup failed")
}
Expand Down Expand Up @@ -139,13 +139,13 @@ func (s *sync) Run(ctx context.Context) error {
return g.Wait()
}

func (s *sync) Warmup(ctx context.Context) error {
func (s *sync) Warmup(ctx context.Context, namespace string) error {
g, ctx := errgroup.WithContext(ctx)

resource := s.factory()
entities, err := s.db.YieldAll(ctx, func() database.Entity {
return s.factory()
}, s.db.BuildSelectStmt(resource, resource.Fingerprint()), struct{}{})
}, s.db.BuildSelectStmt(resource, resource.Fingerprint()), struct{ Namespace string }{Namespace: namespace})
com.ErrgroupReceive(ctx, g, err)

g.Go(func() error {
Expand Down

0 comments on commit cd8201e

Please sign in to comment.