From 5d3ee1c51f5b8d13cfc494c25f809b0d970d88c4 Mon Sep 17 00:00:00 2001 From: Carlos Eduardo Arango Gutierrez Date: Wed, 3 Jul 2024 16:17:32 +0200 Subject: [PATCH 1/2] Use worker DS OwnerReference for NF's Signed-off-by: Carlos Eduardo Arango Gutierrez --- deployment/base/rbac/worker-role.yaml | 6 ++ .../templates/role.yaml | 7 +- pkg/nfd-worker/nfd-worker.go | 71 +++++++++++++------ 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/deployment/base/rbac/worker-role.yaml b/deployment/base/rbac/worker-role.yaml index 72f261e9e7..2b299c7973 100644 --- a/deployment/base/rbac/worker-role.yaml +++ b/deployment/base/rbac/worker-role.yaml @@ -11,3 +11,9 @@ rules: - create - get - update +- apiGroups: + - "" + resources: + - pods + verbs: + - get diff --git a/deployment/helm/node-feature-discovery/templates/role.yaml b/deployment/helm/node-feature-discovery/templates/role.yaml index 3a872e5723..52c69eb197 100644 --- a/deployment/helm/node-feature-discovery/templates/role.yaml +++ b/deployment/helm/node-feature-discovery/templates/role.yaml @@ -15,5 +15,10 @@ rules: - create - get - update +- apiGroups: + - "" + resources: + - pods + verbs: + - get {{- end }} - diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index 1c728771db..624d614550 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog" "sigs.k8s.io/yaml" @@ -134,6 +135,7 @@ type nfdWorker struct { stop chan struct{} // channel for signaling stop featureSources []source.FeatureSource labelSources []source.LabelSource + ownerReference []metav1.OwnerReference } // This ticker can represent infinite and normal intervals. @@ -271,6 +273,36 @@ func (w *nfdWorker) Run() error { labelTrigger.Reset(w.config.Core.SleepInterval.Duration) defer labelTrigger.Stop() + // Get pod owner reference + podName := os.Getenv("POD_NAME") + client, err := w.getKubeClient() + if err != nil { + return fmt.Errorf("failed to get kube client: %w", err) + } + + selfPod, err := client.CoreV1().Pods(w.kubernetesNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get pod %q: %w", podName, err) + } + + // Create owner ref + ownerReference := selfPod.OwnerReferences + + // Add pod owner reference if it exists + podUID := os.Getenv("POD_UID") + if podName != "" && podUID != "" { + isTrue := true + ownerReference = append(ownerReference, metav1.OwnerReference{ + APIVersion: "v1", + Kind: "Pod", + Name: podName, + UID: types.UID(podUID), + Controller: &isTrue, + }) + } + + w.ownerReference = ownerReference + // Register to metrics server if w.args.MetricsPort > 0 { m := utils.CreateMetricsServer(w.args.MetricsPort, @@ -713,25 +745,6 @@ func (m *nfdWorker) updateNodeFeatureObject(labels Labels) error { features := source.GetAllFeatures() - // Create owner ref - ownerRefs := []metav1.OwnerReference{} - podName := os.Getenv("POD_NAME") - podUID := os.Getenv("POD_UID") - if podName != "" && podUID != "" { - isTrue := true - ownerRefs = []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Pod", - Name: podName, - UID: types.UID(podUID), - Controller: &isTrue, - }, - } - } else { - klog.InfoS("Cannot set NodeFeature owner reference, POD_NAME and/or POD_UID not specified") - } - // TODO: we could implement some simple caching of the object, only get it // every 10 minutes or so because nobody else should really be modifying it if nfr, err := cli.NfdV1alpha1().NodeFeatures(namespace).Get(context.TODO(), nodename, metav1.GetOptions{}); errors.IsNotFound(err) { @@ -740,7 +753,7 @@ func (m *nfdWorker) updateNodeFeatureObject(labels Labels) error { Name: nodename, Annotations: map[string]string{nfdv1alpha1.WorkerVersionAnnotation: version.Get()}, Labels: map[string]string{nfdv1alpha1.NodeFeatureObjNodeNameLabel: nodename}, - OwnerReferences: ownerRefs, + OwnerReferences: m.ownerReference, }, Spec: nfdv1alpha1.NodeFeatureSpec{ Features: *features, @@ -761,7 +774,7 @@ func (m *nfdWorker) updateNodeFeatureObject(labels Labels) error { nfrUpdated := nfr.DeepCopy() nfrUpdated.Annotations = map[string]string{nfdv1alpha1.WorkerVersionAnnotation: version.Get()} nfrUpdated.Labels = map[string]string{nfdv1alpha1.NodeFeatureObjNodeNameLabel: nodename} - nfrUpdated.OwnerReferences = ownerRefs + nfrUpdated.OwnerReferences = m.ownerReference nfrUpdated.Spec = nfdv1alpha1.NodeFeatureSpec{ Features: *features, Labels: labels, @@ -801,6 +814,22 @@ func (m *nfdWorker) getNfdClient() (*nfdclient.Clientset, error) { return c, nil } +func (m *nfdWorker) getKubeClient() (*kubernetes.Clientset, error) { + // creates the in-cluster config + kubeconfig, err := utils.GetKubeconfig(m.args.Kubeconfig) + if err != nil { + return nil, err + } + + // creates the clientset + clientset, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + return nil, err + } + + return clientset, nil +} + // UnmarshalJSON implements the Unmarshaler interface from "encoding/json" func (c *sourcesConfig) UnmarshalJSON(data []byte) error { // First do a raw parse to get the per-source data From e33e68ad5b07e26a5c5a951c1ba344fab5ac15b9 Mon Sep 17 00:00:00 2001 From: Carlos Eduardo Arango Gutierrez Date: Fri, 5 Jul 2024 15:22:58 +0200 Subject: [PATCH 2/2] Add optionable arguments to NewWorker Signed-off-by: Carlos Eduardo Arango Gutierrez --- cmd/nfd-worker/main.go | 2 +- pkg/nfd-worker/nfd-worker-internal_test.go | 16 +-- pkg/nfd-worker/nfd-worker.go | 120 +++++++++++++-------- pkg/nfd-worker/nfd-worker_test.go | 15 ++- test/e2e/utils/rbac.go | 5 + 5 files changed, 99 insertions(+), 59 deletions(-) diff --git a/cmd/nfd-worker/main.go b/cmd/nfd-worker/main.go index 7d41aa562d..20a2593814 100644 --- a/cmd/nfd-worker/main.go +++ b/cmd/nfd-worker/main.go @@ -83,7 +83,7 @@ func main() { // Get new NfdWorker instance args.GrpcHealthPort = GrpcHealthPort - instance, err := worker.NewNfdWorker(args) + instance, err := worker.NewNfdWorker(worker.WithArgs(args)) if err != nil { klog.ErrorS(err, "failed to initialize NfdWorker instance") os.Exit(1) diff --git a/pkg/nfd-worker/nfd-worker-internal_test.go b/pkg/nfd-worker/nfd-worker-internal_test.go index 101177684e..067653d648 100644 --- a/pkg/nfd-worker/nfd-worker-internal_test.go +++ b/pkg/nfd-worker/nfd-worker-internal_test.go @@ -27,6 +27,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/mock" "github.com/vektra/errors" + fakeclient "k8s.io/client-go/kubernetes/fake" nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1" "sigs.k8s.io/node-feature-discovery/pkg/labeler" @@ -97,7 +98,8 @@ func makeFakeFeatures(names []string) (source.FeatureLabels, Labels) { func TestConfigParse(t *testing.T) { Convey("When parsing configuration", t, func() { - w, err := NewNfdWorker(&Args{}) + w, err := NewNfdWorker(WithArgs(&Args{}), + WithKubernetesClient(fakeclient.NewSimpleClientset())) So(err, ShouldBeNil) worker := w.(*nfdWorker) overrides := `{"core": {"labelSources": ["fake"],"noPublish": true},"sources": {"cpu": {"cpuid": {"attributeBlacklist": ["foo","bar"]}}}}` @@ -222,13 +224,13 @@ core: `) noPublish := true - w, err := NewNfdWorker(&Args{ + w, err := NewNfdWorker(WithArgs(&Args{ ConfigFile: configFile, Overrides: ConfigOverrideArgs{ FeatureSources: &utils.StringSliceVal{"fake"}, LabelSources: &utils.StringSliceVal{"fake"}, NoPublish: &noPublish}, - }) + }), WithKubernetesClient(fakeclient.NewSimpleClientset())) So(err, ShouldBeNil) worker := w.(*nfdWorker) @@ -307,7 +309,8 @@ func TestNewNfdWorker(t *testing.T) { Convey("without any args specified", func() { args := &Args{} - w, err := NewNfdWorker(args) + w, err := NewNfdWorker(WithArgs(args), + WithKubernetesClient(fakeclient.NewSimpleClientset())) Convey("no error should be returned", func() { So(err, ShouldBeNil) }) @@ -324,7 +327,8 @@ func TestNewNfdWorker(t *testing.T) { args := &Args{Overrides: ConfigOverrideArgs{ LabelSources: &utils.StringSliceVal{"fake"}, FeatureSources: &utils.StringSliceVal{"cpu"}}} - w, err := NewNfdWorker(args) + w, err := NewNfdWorker(WithArgs(args), + WithKubernetesClient(fakeclient.NewSimpleClientset())) Convey("no error should be returned", func() { So(err, ShouldBeNil) }) @@ -373,7 +377,7 @@ func TestCreateFeatureLabels(t *testing.T) { func TestAdvertiseFeatureLabels(t *testing.T) { Convey("When advertising labels", t, func() { - w, err := NewNfdWorker(&Args{}) + w, err := NewNfdWorker(WithArgs(&Args{}), WithKubernetesClient(fakeclient.NewSimpleClientset())) So(err, ShouldBeNil) worker := w.(*nfdWorker) diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index 624d614550..b9bb87f60a 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -39,7 +39,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" - "k8s.io/client-go/kubernetes" + k8sclient "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog" "sigs.k8s.io/yaml" @@ -131,6 +131,7 @@ type nfdWorker struct { kubernetesNamespace string grpcClient pb.LabelerClient healthServer *grpc.Server + k8sClient k8sclient.Interface nfdClient *nfdclient.Clientset stop chan struct{} // channel for signaling stop featureSources []source.FeatureSource @@ -143,30 +144,70 @@ type infiniteTicker struct { *time.Ticker } +// NfdWorkerOption sets properties of the NfdWorker instance. +type NfdWorkerOption interface { + apply(*nfdWorker) +} + +// WithArgs is used for passing settings from command line arguments. +func WithArgs(args *Args) NfdWorkerOption { + return &nfdMWorkerOpt{f: func(n *nfdWorker) { n.args = *args }} +} + +// WithKuberneteClient forces to use the given kubernetes client, without +// initializing one from kubeconfig. +func WithKubernetesClient(cli k8sclient.Interface) NfdWorkerOption { + return &nfdMWorkerOpt{f: func(n *nfdWorker) { n.k8sClient = cli }} +} + +type nfdMWorkerOpt struct { + f func(*nfdWorker) +} + +func (f *nfdMWorkerOpt) apply(n *nfdWorker) { + f.f(n) +} + // NewNfdWorker creates new NfdWorker instance. -func NewNfdWorker(args *Args) (NfdWorker, error) { +func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) { nfd := &nfdWorker{ - args: *args, config: &NFDConfig{}, kubernetesNamespace: utils.GetKubernetesNamespace(), stop: make(chan struct{}), } + for _, o := range opts { + o.apply(nfd) + } + // Check TLS related args - if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" { - if args.CertFile == "" { + if nfd.args.CertFile != "" || nfd.args.KeyFile != "" || nfd.args.CaFile != "" { + if nfd.args.CertFile == "" { return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file") } - if args.KeyFile == "" { + if nfd.args.KeyFile == "" { return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file") } - if args.CaFile == "" { + if nfd.args.CaFile == "" { return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file") } } - if args.ConfigFile != "" { - nfd.configFilePath = filepath.Clean(args.ConfigFile) + if nfd.args.ConfigFile != "" { + nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile) + } + + // k8sClient might've been set via opts by tests + if nfd.k8sClient == nil { + kubeconfig, err := utils.GetKubeconfig(nfd.args.Kubeconfig) + if err != nil { + return nfd, err + } + cli, err := k8sclient.NewForConfig(kubeconfig) + if err != nil { + return nfd, err + } + nfd.k8sClient = cli } return nfd, nil @@ -273,32 +314,33 @@ func (w *nfdWorker) Run() error { labelTrigger.Reset(w.config.Core.SleepInterval.Duration) defer labelTrigger.Stop() + // Create owner ref + ownerReference := []metav1.OwnerReference{} // Get pod owner reference podName := os.Getenv("POD_NAME") - client, err := w.getKubeClient() - if err != nil { - return fmt.Errorf("failed to get kube client: %w", err) - } - - selfPod, err := client.CoreV1().Pods(w.kubernetesNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get pod %q: %w", podName, err) - } - - // Create owner ref - ownerReference := selfPod.OwnerReferences // Add pod owner reference if it exists - podUID := os.Getenv("POD_UID") - if podName != "" && podUID != "" { - isTrue := true - ownerReference = append(ownerReference, metav1.OwnerReference{ - APIVersion: "v1", - Kind: "Pod", - Name: podName, - UID: types.UID(podUID), - Controller: &isTrue, - }) + if podName != "" { + if selfPod, err := w.k8sClient.CoreV1().Pods(w.kubernetesNamespace).Get(context.TODO(), podName, metav1.GetOptions{}); err != nil { + klog.ErrorS(err, "failed to get self pod, cannot inherit ownerReference for NodeFeature") + return err + } else { + ownerReference = append(ownerReference, selfPod.OwnerReferences...) + } + + podUID := os.Getenv("POD_UID") + if podUID != "" { + ownerReference = append(ownerReference, metav1.OwnerReference{ + APIVersion: "v1", + Kind: "Pod", + Name: podName, + UID: types.UID(podUID), + }) + } else { + klog.InfoS("Cannot append POD ownerReference to NodeFeature, POD_UID not specified") + } + } else { + klog.InfoS("Cannot set NodeFeature owner references, POD_NAME not specified") } w.ownerReference = ownerReference @@ -814,22 +856,6 @@ func (m *nfdWorker) getNfdClient() (*nfdclient.Clientset, error) { return c, nil } -func (m *nfdWorker) getKubeClient() (*kubernetes.Clientset, error) { - // creates the in-cluster config - kubeconfig, err := utils.GetKubeconfig(m.args.Kubeconfig) - if err != nil { - return nil, err - } - - // creates the clientset - clientset, err := kubernetes.NewForConfig(kubeconfig) - if err != nil { - return nil, err - } - - return clientset, nil -} - // UnmarshalJSON implements the Unmarshaler interface from "encoding/json" func (c *sourcesConfig) UnmarshalJSON(data []byte) error { // First do a raw parse to get the per-source data diff --git a/pkg/nfd-worker/nfd-worker_test.go b/pkg/nfd-worker/nfd-worker_test.go index c5582d064a..42fe8b71bc 100644 --- a/pkg/nfd-worker/nfd-worker_test.go +++ b/pkg/nfd-worker/nfd-worker_test.go @@ -90,9 +90,12 @@ func teardownTest(ctx testContext) { func TestNewNfdWorker(t *testing.T) { Convey("When initializing new NfdWorker instance", t, func() { Convey("When one of -cert-file, -key-file or -ca-file is missing", func() { - _, err := worker.NewNfdWorker(&worker.Args{CertFile: "crt", KeyFile: "key"}) - _, err2 := worker.NewNfdWorker(&worker.Args{KeyFile: "key", CaFile: "ca"}) - _, err3 := worker.NewNfdWorker(&worker.Args{CertFile: "crt", CaFile: "ca"}) + _, err := worker.NewNfdWorker(worker.WithArgs(&worker.Args{CertFile: "crt", KeyFile: "key"}), + worker.WithKubernetesClient(fakeclient.NewSimpleClientset())) + _, err2 := worker.NewNfdWorker(worker.WithArgs(&worker.Args{KeyFile: "key", CaFile: "ca"}), + worker.WithKubernetesClient(fakeclient.NewSimpleClientset())) + _, err3 := worker.NewNfdWorker(worker.WithArgs(&worker.Args{CertFile: "crt", CaFile: "ca"}), + worker.WithKubernetesClient(fakeclient.NewSimpleClientset())) Convey("An error should be returned", func() { So(err, ShouldNotBeNil) So(err2, ShouldNotBeNil) @@ -112,7 +115,8 @@ func TestRun(t *testing.T) { Oneshot: true, Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, } - fooasdf, _ := worker.NewNfdWorker(args) + fooasdf, _ := worker.NewNfdWorker(worker.WithArgs(args), + worker.WithKubernetesClient(fakeclient.NewSimpleClientset())) err := fooasdf.Run() Convey("No error should be returned", func() { So(err, ShouldBeNil) @@ -141,7 +145,8 @@ func TestRunTls(t *testing.T) { Oneshot: true, Overrides: worker.ConfigOverrideArgs{LabelSources: &utils.StringSliceVal{"fake"}}, } - w, _ := worker.NewNfdWorker(&workerArgs) + w, _ := worker.NewNfdWorker(worker.WithArgs(&workerArgs), + worker.WithKubernetesClient(fakeclient.NewSimpleClientset())) err := w.Run() Convey("No error should be returned", func() { So(err, ShouldBeNil) diff --git a/test/e2e/utils/rbac.go b/test/e2e/utils/rbac.go index 902609eec3..67470e331d 100644 --- a/test/e2e/utils/rbac.go +++ b/test/e2e/utils/rbac.go @@ -224,6 +224,11 @@ func createRoleWorker(ctx context.Context, cs clientset.Interface, ns string) (* Resources: []string{"nodefeatures"}, Verbs: []string{"create", "get", "update"}, }, + { + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get"}, + }, }, } return cs.RbacV1().Roles(ns).Update(ctx, cr, metav1.UpdateOptions{})