From 41042012fe9dff6b2284f25eba935960638bf132 Mon Sep 17 00:00:00 2001 From: Bryce Soghigian Date: Fri, 10 Jan 2025 01:15:53 -0800 Subject: [PATCH] feat: split nic and vm into their own gc controllers, added shared state between them to prevent conflicts in nic deletion calls --- pkg/controllers/controllers.go | 11 +- .../nodeclaim/garbagecollection/controller.go | 198 ------------------ .../instance_garbagecollection.go | 137 ++++++++++++ .../nic_garbagecollection.go | 113 ++++++++++ 4 files changed, 260 insertions(+), 199 deletions(-) delete mode 100644 pkg/controllers/nodeclaim/garbagecollection/controller.go create mode 100644 pkg/controllers/nodeclaim/garbagecollection/instance_garbagecollection.go create mode 100644 pkg/controllers/nodeclaim/garbagecollection/nic_garbagecollection.go diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 99b30c2b7..cb0c006d0 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -18,9 +18,11 @@ package controllers import ( "context" + "time" "github.com/awslabs/operatorpkg/controller" "github.com/awslabs/operatorpkg/status" + "github.com/patrickmn/go-cache" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/events" @@ -40,11 +42,18 @@ import ( func NewControllers(ctx context.Context, mgr manager.Manager, kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, instanceProvider instance.Provider) []controller.Controller { + + unremovableNics := cache.New(nodeclaimgarbagecollection.NicReservationDuration, time.Second * 30) + controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), nodeclassstatus.NewController(kubeClient), nodeclasstermination.NewController(kubeClient, recorder), - nodeclaimgarbagecollection.NewController(kubeClient, cloudProvider, instanceProvider), + + // resources the instance provider creates are garbage collected by these controllers + nodeclaimgarbagecollection.NewVirtualMachineController(kubeClient, cloudProvider, unremovableNics), + nodeclaimgarbagecollection.NewNetworkInterfaceController(kubeClient, instanceProvider, unremovableNics), + // TODO: nodeclaim tagging inplaceupdate.NewController(kubeClient, instanceProvider), status.NewController[*v1alpha2.AKSNodeClass](kubeClient, mgr.GetEventRecorderFor("karpenter")), diff --git a/pkg/controllers/nodeclaim/garbagecollection/controller.go b/pkg/controllers/nodeclaim/garbagecollection/controller.go deleted file mode 100644 index 6465984e7..000000000 --- a/pkg/controllers/nodeclaim/garbagecollection/controller.go +++ /dev/null @@ -1,198 +0,0 @@ -/* -Portions Copyright (c) Microsoft Corporation. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package garbagecollection - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/Azure/karpenter-provider-azure/pkg/providers/instance" - "github.com/awslabs/operatorpkg/singleton" - - // "github.com/Azure/karpenter-provider-azure/pkg/cloudprovider" - "github.com/samber/lo" - "go.uber.org/multierr" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/karpenter/pkg/operator/injection" - - karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" - - corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider" -) - -type Controller struct { - kubeClient client.Client - cloudProvider corecloudprovider.CloudProvider - instanceProvider instance.Provider - successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller -} - -func NewController(kubeClient client.Client, cloudProvider corecloudprovider.CloudProvider, instanceProvider instance.Provider) *Controller { - return &Controller{ - kubeClient: kubeClient, - cloudProvider: cloudProvider, - instanceProvider: instanceProvider, - successfulCount: 0, - } -} - -func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { - ctx = injection.WithControllerName(ctx, "instance.garbagecollection") - var aggregatedError error - - // Perform VM garbage collection - if err := c.gcVMs(ctx); err != nil { - aggregatedError = multierr.Append(aggregatedError, fmt.Errorf("VM garbage collection failed: %w", err)) - } - - // Perform NIC garbage collection - if err := c.gcNics(ctx); err != nil { - aggregatedError = multierr.Append(aggregatedError, fmt.Errorf("NIC garbage collection failed: %w", err)) - } - - c.successfulCount++ - - return reconcile.Result{ - RequeueAfter: lo.Ternary(c.successfulCount <= 20, 10*time.Second, 2*time.Minute), - }, aggregatedError -} - -// gcVMs handles the garbage collection of virtual machines. -func (c *Controller) gcVMs(ctx context.Context) error { - // List VMs from the CloudProvider - retrieved, err := c.cloudProvider.List(ctx) - if err != nil { - return fmt.Errorf("listing cloudprovider VMs: %w", err) - } - - // Filter out VMs that are marked for deletion - managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool { - return nc.DeletionTimestamp.IsZero() - }) - - // List NodeClaims and Nodes from the cluster - nodeClaimList := &karpv1.NodeClaimList{} - if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { - return fmt.Errorf("listing NodeClaims: %w", err) - } - - nodeList := &v1.NodeList{} - if err := c.kubeClient.List(ctx, nodeList); err != nil { - return fmt.Errorf("listing Nodes: %w", err) - } - - resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) { - return n.Status.ProviderID, n.Status.ProviderID != "" - })...) - - errs := make([]error, len(managedRetrieved)) - - workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) { - vm := managedRetrieved[i] - if !resolvedProviderIDs.Has(vm.Status.ProviderID) && - time.Since(vm.CreationTimestamp.Time) > 5*time.Minute { - errs[i] = c.garbageCollect(ctx, vm, nodeList) - } - }) - - if combinedErr := multierr.Combine(errs...); combinedErr != nil { - return combinedErr - } - - return nil -} - -// gcNics handles the garbage collection of network interfaces. -func (c *Controller) gcNics(ctx context.Context) error { - // Refresh the list of NodeClaims after VM GC - nodeClaimList := &karpv1.NodeClaimList{} - if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { - return fmt.Errorf("listing NodeClaims for NIC GC: %w", err) - } - - // Normalize NodeClaim names to match NIC naming conventions - nodeClaimNames := sets.New[string]() - for _, nodeClaim := range nodeClaimList.Items { - // Adjust the prefix as per the aks naming convention - nodeClaimNames.Insert(fmt.Sprintf("aks-%s", nodeClaim.Name)) - } - - // List all NICs from the instance provider, this List call will give us network interfaces that belong to karpenter - nics, err := c.instanceProvider.ListNics(ctx) - if err != nil { - return fmt.Errorf("listing NICs: %w", err) - } - - // Initialize a slice to collect errors from goroutines - var gcErrors []error - var mu sync.Mutex - - // Parallelize the garbage collection process for NICs - workqueue.ParallelizeUntil(ctx, 100, len(nics), func(i int) { - nicName := lo.FromPtr(nics[i].Name) - if !nodeClaimNames.Has(nicName) { - if err := c.instanceProvider.Delete(ctx, nicName); err != nil { - mu.Lock() - gcErrors = append(gcErrors, fmt.Errorf("deleting NIC %s: %w", nicName, err)) - mu.Unlock() - } - logging.FromContext(ctx).With("nic", nicName).Infof("garbage collected NIC") - } - }) - - // Combine all errors into one - if len(gcErrors) > 0 { - return multierr.Combine(gcErrors...) - } - - return nil -} - -func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *v1.NodeList) error { - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", nodeClaim.Status.ProviderID)) - if err := c.cloudProvider.Delete(ctx, nodeClaim); err != nil { - return corecloudprovider.IgnoreNodeClaimNotFoundError(err) - } - logging.FromContext(ctx).Debugf("garbage collected cloudprovider instance") - - // Go ahead and cleanup the node if we know that it exists to make scheduling go quicker - if node, ok := lo.Find(nodeList.Items, func(n v1.Node) bool { - return n.Spec.ProviderID == nodeClaim.Status.ProviderID - }); ok { - if err := c.kubeClient.Delete(ctx, &node); err != nil { - return client.IgnoreNotFound(err) - } - logging.FromContext(ctx).With("node", node.Name).Debugf("garbage collected node") - } - return nil -} - -func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controllerruntime.NewControllerManagedBy(m). - Named("instance.garbagecollection"). - WatchesRawSource(singleton.Source()). - Complete(singleton.AsReconciler(c)) -} diff --git a/pkg/controllers/nodeclaim/garbagecollection/instance_garbagecollection.go b/pkg/controllers/nodeclaim/garbagecollection/instance_garbagecollection.go new file mode 100644 index 000000000..ab0cf0ef7 --- /dev/null +++ b/pkg/controllers/nodeclaim/garbagecollection/instance_garbagecollection.go @@ -0,0 +1,137 @@ +/* +Portions Copyright (c) Microsoft Corporation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollection + +import ( + "context" + "fmt" + "time" + + "github.com/Azure/karpenter-provider-azure/pkg/providers/instance" + "github.com/awslabs/operatorpkg/singleton" + "github.com/patrickmn/go-cache" + + // "github.com/Azure/karpenter-provider-azure/pkg/cloudprovider" + "github.com/samber/lo" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "knative.dev/pkg/logging" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/injection" + + karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + + corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider" +) + + +const ( + NicBelongsToVM = "NicBelongsToVM" +) + +type VirtualMachineController struct { + kubeClient client.Client + cloudProvider corecloudprovider.CloudProvider + unremovableNics *cache.Cache + successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller +} + +func NewVirtualMachineController(kubeClient client.Client, cloudProvider corecloudprovider.CloudProvider, unremovableNics *cache.Cache) *VirtualMachineController { + return &VirtualMachineController{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + unremovableNics: unremovableNics, + successfulCount: 0, + } +} + +func (c *VirtualMachineController) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "instance.garbagecollection") + + // We LIST VMs on the CloudProvider BEFORE we grab NodeClaims/Nodes on the cluster so that we make sure that, if + // LISTing instances takes a long time, our information is more updated by the time we get to nodeclaim and Node LIST + // This works since our CloudProvider instances are deleted based on whether the NodeClaim exists or not, not vice-versa + retrieved, err := c.cloudProvider.List(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("listing cloudprovider VMs, %w", err) + } + + // Mark all vms on the cloudprovider as unremovableNics for the nicGc controller + for _, nodeClaim := range retrieved { + // Nics Belonging to a vm cannot be removed while attached to the vm. + // lets set a nic as unremovable for 15 minutes if it belongs to a vm + c.unremovableNics.Set(instance.GenerateResourceName(nodeClaim.Name), "", time.Minute * 15) + } + + managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool { + return nc.DeletionTimestamp.IsZero() + }) + nodeClaimList := &karpv1.NodeClaimList{} + if err = c.kubeClient.List(ctx, nodeClaimList); err != nil { + return reconcile.Result{}, err + } + nodeList := &v1.NodeList{} + if err := c.kubeClient.List(ctx, nodeList); err != nil { + return reconcile.Result{}, err + } + resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) { + return n.Status.ProviderID, n.Status.ProviderID != "" + })...) + errs := make([]error, len(retrieved)) + workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) { + if !resolvedProviderIDs.Has(managedRetrieved[i].Status.ProviderID) && + time.Since(managedRetrieved[i].CreationTimestamp.Time) > time.Minute*5 { + errs[i] = c.garbageCollect(ctx, managedRetrieved[i], nodeList) + } + }) + if err = multierr.Combine(errs...); err != nil { + return reconcile.Result{}, err + } + c.successfulCount++ + return reconcile.Result{RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2)}, nil +} + +func (c *VirtualMachineController) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *v1.NodeList) error { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", nodeClaim.Status.ProviderID)) + if err := c.cloudProvider.Delete(ctx, nodeClaim); err != nil { + return corecloudprovider.IgnoreNodeClaimNotFoundError(err) + } + logging.FromContext(ctx).Debugf("garbage collected cloudprovider instance") + + // Go ahead and cleanup the node if we know that it exists to make scheduling go quicker + if node, ok := lo.Find(nodeList.Items, func(n v1.Node) bool { + return n.Spec.ProviderID == nodeClaim.Status.ProviderID + }); ok { + if err := c.kubeClient.Delete(ctx, &node); err != nil { + return client.IgnoreNotFound(err) + } + logging.FromContext(ctx).With("node", node.Name).Debugf("garbage collected node") + } + return nil +} + +func (c *VirtualMachineController) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("instance.garbagecollection"). + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) +} diff --git a/pkg/controllers/nodeclaim/garbagecollection/nic_garbagecollection.go b/pkg/controllers/nodeclaim/garbagecollection/nic_garbagecollection.go new file mode 100644 index 000000000..84520ca61 --- /dev/null +++ b/pkg/controllers/nodeclaim/garbagecollection/nic_garbagecollection.go @@ -0,0 +1,113 @@ +package garbagecollection + +import ( + "time" + "context" + "fmt" + + + "github.com/samber/lo" + "github.com/patrickmn/go-cache" + "knative.dev/pkg/logging" + + karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "github.com/awslabs/operatorpkg/singleton" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/operator/injection" + + + "github.com/Azure/karpenter-provider-azure/pkg/providers/instance" + sdkerrors "github.com/Azure/azure-sdk-for-go-extensions/pkg/errors" +) + + +const ( + NICGCControllerName = "networkinterface.garbagecollection" + NicReservationDuration = time.Second * 180 +) + + + +type NetworkInterfaceController struct { + kubeClient client.Client + instanceProvider instance.Provider + // A networkInterface is considered unremovable if it meets the following 3 criteria + // 1: Reserved by NRP: When creating a nic and attempting to assign it to a vm, the nic will be reserved for that vm arm_resource_id for 180 seconds + // 2: Belongs to a Nodeclaim: If a nodeclaim is + // 3: Belongs to VM: If the VM Garbage Collection controller is removing a vm, we should not attempt removing it in this controller + unremovableNics *cache.Cache + +} + +func NewNetworkInterfaceController(kubeClient client.Client, instanceProvider instance.Provider, unremovableNics *cache.Cache) *NetworkInterfaceController { + return &NetworkInterfaceController{ + kubeClient: kubeClient, + instanceProvider: instanceProvider, + unremovableNics: unremovableNics, + } +} + +func (c *NetworkInterfaceController) Reconcile(ctx context.Context) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, NICGCControllerName) + + nodeClaimList := &karpv1.NodeClaimList{} + if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { + return reconcile.Result{}, fmt.Errorf("listing NodeClaims for NIC GC: %w", err) + } + + + // List all NICs from the instance provider, this List call will give us network interfaces that belong to karpenter + nics, err := c.instanceProvider.ListNics(ctx) + if err != nil { + return reconcile.Result{}, fmt.Errorf("listing NICs: %w", err) + } + + // resourceNames is the resource representation for each nodeclaim + resourceNames := sets.New[string]() + for _, nodeClaim := range nodeClaimList.Items { + // Adjust the prefix as per the aks naming convention + resourceNames.Insert(fmt.Sprintf("aks-%s", nodeClaim.Name)) + } + + errs := make([]error, len(nics)) + workqueue.ParallelizeUntil(ctx, 100, len(nics), func(i int){ + nicName := lo.FromPtr(nics[i].Name) + _, removableNic := c.unremovableNics.Get(nicName) + noNodeclaimExistsForNIC := !resourceNames.Has(nicName) + // The networkInterface is unremovable if its + // A: Reserved by NRP + // B: Belongs to a Nodeclaim + // C: Belongs to VM + if noNodeclaimExistsForNIC && removableNic { + err := c.instanceProvider.DeleteNic(ctx, nicName) + if sdkerrors.IsNicReservedForAnotherVM(err) { + c.unremovableNics.Set(nicName, sdkerrors.NicReservedForAnotherVM, NicReservationDuration) + return + } + if err != nil { + errs[i] = err + return + } + + logging.FromContext(ctx).With("nic", nicName).Infof("garbage collected NIC") + } + }) + + // requeue every 5 minutes, adjust for throttling? + return reconcile.Result{ + Requeue: true, + RequeueAfter: time.Minute * 5, + }, nil +} + +func (c *NetworkInterfaceController) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named(NICGCControllerName). + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) +}