Skip to content

Commit

Permalink
[AddressBinding] Enhance error handling
Browse files Browse the repository at this point in the history
Add conditions to CR. Follow SubnetPort conditions.
Handle SubnetPort deleted when AddressBinding exists.

Signed-off-by: Ran Gu <[email protected]>
  • Loading branch information
gran-vmv committed Nov 1, 2024
1 parent 535a902 commit d38142b
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 46 deletions.
29 changes: 29 additions & 0 deletions build/yaml/crd/vpc/crd.nsx.vmware.com_addressbindings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ spec:
type: object
status:
properties:
conditions:
description: Conditions describes current state of AddressBinding.
items:
description: Condition defines condition of custom resource.
properties:
lastTransitionTime:
description: |-
Last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when
the API field changed is acceptable.
format: date-time
type: string
message:
description: Message shows a human-readable message about condition.
type: string
reason:
description: Reason shows a brief reason of condition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type defines condition type.
type: string
required:
- status
- type
type: object
type: array
ipAddress:
type: string
required:
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/vpc/v1alpha1/addressbinding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type AddressBindingSpec struct {
}

type AddressBindingStatus struct {
// Conditions describes current state of AddressBinding.
Conditions []Condition `json:"conditions,omitempty"`

IPAddress string `json:"ipAddress"`
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/vpc/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

177 changes: 143 additions & 34 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"reflect"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -46,6 +47,12 @@ var (
MetricResTypeSubnetPort = common.MetricResTypeSubnetPort
)

var (
vmOrInterfaceNotFoundError = fmt.Errorf("VM or interface not found")
subnetPortRealizationError = fmt.Errorf("SubnetPort realization error")
multipleInterfaceFoundError = fmt.Errorf("multiple interfaces found")
)

// SubnetPortReconciler reconciles a SubnetPort object
type SubnetPortReconciler struct {
client.Client
Expand Down Expand Up @@ -317,6 +324,8 @@ func (r *SubnetPortReconciler) CollectGarbage(ctx context.Context) {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypeSubnetPort)
}
}

r.collectAddressBindingGarbage(ctx)
}

func (r *SubnetPortReconciler) setSubnetPortReadyStatusTrue(ctx context.Context, subnetPort *v1alpha1.SubnetPort, transitionTime metav1.Time) {
Expand Down Expand Up @@ -390,25 +399,32 @@ func getExistingConditionOfType(conditionType v1alpha1.ConditionType, existingCo
}

func updateFail(r *SubnetPortReconciler, c context.Context, o *v1alpha1.SubnetPort, e *error) {
r.setSubnetPortReadyStatusFalse(c, o, metav1.Now(), e)
now := metav1.Now()
r.setSubnetPortReadyStatusFalse(c, o, now, e)
r.setAddressBindingStatusBySubnetPort(c, o, now, subnetPortRealizationError)
r.Recorder.Event(o, v1.EventTypeWarning, common.ReasonFailUpdate, fmt.Sprintf("%v", *e))
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResTypeSubnetPort)
}

func deleteFail(r *SubnetPortReconciler, c context.Context, o *v1alpha1.SubnetPort, e *error) {
r.setSubnetPortReadyStatusFalse(c, o, metav1.Now(), e)
now := metav1.Now()
r.setSubnetPortReadyStatusFalse(c, o, now, e)
r.setAddressBindingStatusBySubnetPort(c, o, now, subnetPortRealizationError)
r.Recorder.Event(o, v1.EventTypeWarning, common.ReasonFailDelete, fmt.Sprintf("%v", *e))
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypeSubnetPort)
}

func updateSuccess(r *SubnetPortReconciler, c context.Context, o *v1alpha1.SubnetPort) {
r.setSubnetPortReadyStatusTrue(c, o, metav1.Now())
r.setAddressBindingStatus(c, o)
now := metav1.Now()
r.setSubnetPortReadyStatusTrue(c, o, now)
r.setAddressBindingStatusBySubnetPort(c, o, now, nil)
r.Recorder.Event(o, v1.EventTypeNormal, common.ReasonSuccessfulUpdate, "SubnetPort CR has been successfully updated")
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResTypeSubnetPort)
}

func deleteSuccess(r *SubnetPortReconciler, _ context.Context, o *v1alpha1.SubnetPort) {
func deleteSuccess(r *SubnetPortReconciler, c context.Context, o *v1alpha1.SubnetPort) {
now := metav1.Now()
r.setAddressBindingStatusBySubnetPort(c, o, now, vmOrInterfaceNotFoundError)
r.Recorder.Event(o, v1.EventTypeNormal, common.ReasonSuccessfulDelete, "SubnetPort CR has been successfully deleted")
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypeSubnetPort)
}
Expand Down Expand Up @@ -540,37 +556,35 @@ func (r *SubnetPortReconciler) addressBindingMapFunc(ctx context.Context, obj cl
log.Info("Invalid object", "type", reflect.TypeOf(obj))
return nil
}
// skip reconcile if AddressBinding exists and is realized
if ab.Status.IPAddress != "" {
namespacedName := types.NamespacedName{
Name: ab.Name,
Namespace: ab.Namespace,
}
existingAddressBinding := &v1alpha1.AddressBinding{}
if err := r.Client.Get(context.TODO(), namespacedName, existingAddressBinding); err == nil {
return nil
}
}
spList := &v1alpha1.SubnetPortList{}
spIndexValue := fmt.Sprintf("%s/%s", ab.Namespace, ab.Spec.VMName)
err := r.Client.List(context.TODO(), spList, client.MatchingFields{util.SubnetPortNamespaceVMIndexKey: spIndexValue})
if err != nil || len(spList.Items) == 0 {
if err != nil {
log.Error(err, "Failed to list SubnetPort from cache", "indexValue", spIndexValue)
return nil
}
// sort by CreationTimestamp
slices.SortFunc(spList.Items, func(a, b v1alpha1.SubnetPort) int {
return a.CreationTimestamp.UTC().Compare(b.CreationTimestamp.UTC())
})
if len(spList.Items) == 0 {
r.setAddressBindingStatus(ctx, ab, metav1.Now(), vmOrInterfaceNotFoundError, "")
return nil
}
if ab.Spec.InterfaceName == "" {
if len(spList.Items) == 1 {
log.V(1).Info("Enqueue SubnetPort for default AddressBinding", "namespace", ab.Namespace, "name", ab.Name, "SubnetPortName", spList.Items[0].Name, "VM", ab.Spec.VMName)
return []reconcile.Request{{
NamespacedName: types.NamespacedName{
Name: spList.Items[0].Name,
Namespace: spList.Items[0].Namespace,
},
}}
} else {
log.Info("Found multiple SubnetPorts for a VM, ignore default AddressBinding for SubnetPort", "namespace", ab.Namespace, "name", ab.Name, "subnetPortCount", len(spList.Items), "VM", ab.Spec.VMName)
return nil
}
// Reconcile the oldest SubnetPort to check if the ExternalAddress should be removed.
log.Info("Found multiple SubnetPorts for a VM, enqueue oldest SubnetPort", "namespace", ab.Namespace, "name", ab.Name, "subnetPortCount", len(spList.Items), "VM", ab.Spec.VMName)
r.setAddressBindingStatus(ctx, ab, metav1.Now(), multipleInterfaceFoundError, "")
}
return []reconcile.Request{{
NamespacedName: types.NamespacedName{
Name: spList.Items[0].Name,
Namespace: spList.Items[0].Namespace,
},
}}
}
for i, sp := range spList.Items {
vm, port, err := common.GetVirtualMachineNameForSubnetPort(&spList.Items[i])
Expand All @@ -590,28 +604,123 @@ func (r *SubnetPortReconciler) addressBindingMapFunc(ctx context.Context, obj cl
}
}
log.Info("No SubnetPort found for AddressBinding", "namespace", ab.Namespace, "name", ab.Name, "VM", ab.Spec.VMName)
r.setAddressBindingStatus(ctx, ab, metav1.Now(), vmOrInterfaceNotFoundError, "")
return nil
}

func (r *SubnetPortReconciler) setAddressBindingStatus(ctx context.Context, subnetPort *v1alpha1.SubnetPort) {
func (r *SubnetPortReconciler) collectAddressBindingGarbage(ctx context.Context) {
abList := &v1alpha1.AddressBindingList{}
err := r.Client.List(context.TODO(), abList)
if err != nil {
log.Error(err, "Failed to list AddressBinding from cache")
return
}
for _, ab := range abList.Items {
spList := &v1alpha1.SubnetPortList{}
spIndexValue := fmt.Sprintf("%s/%s", ab.Namespace, ab.Spec.VMName)
err := r.Client.List(context.TODO(), spList, client.MatchingFields{util.SubnetPortNamespaceVMIndexKey: spIndexValue})
if err != nil {
log.Error(err, "Failed to list SubnetPort from cache", "indexValue", spIndexValue)
continue
}
if ab.Spec.InterfaceName == "" && len(spList.Items) > 1 {
r.setAddressBindingStatus(ctx, &ab, metav1.Now(), multipleInterfaceFoundError, "")

Check failure on line 627 in pkg/controllers/subnetport/subnetport_controller.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
continue
}
found := false
for i, sp := range spList.Items {
vm, port, err := common.GetVirtualMachineNameForSubnetPort(&spList.Items[i])
if err != nil || vm == "" {
log.Error(err, "Failed to get VM name from SubnetPort", "namespace", sp.Namespace, "name", sp.Name, "annotations", sp.Annotations)
continue
}
if ab.Spec.InterfaceName == "" || ab.Spec.InterfaceName == port {
found = true
break
}
}
if !found {
r.setAddressBindingStatus(ctx, &ab, metav1.Now(), vmOrInterfaceNotFoundError, "")

Check failure on line 643 in pkg/controllers/subnetport/subnetport_controller.go

View workflow job for this annotation

GitHub Actions / build

G601: Implicit memory aliasing in for loop. (gosec)
}
}
}

func (r *SubnetPortReconciler) setAddressBindingStatusBySubnetPort(ctx context.Context, subnetPort *v1alpha1.SubnetPort, transitionTime metav1.Time, e error) {
ipAddress := ""
subnetPortID := r.SubnetPortService.BuildSubnetPortId(&subnetPort.ObjectMeta)
nsxSubnetPort := r.SubnetPortService.SubnetPortStore.GetByKey(subnetPortID)
if nsxSubnetPort == nil {
log.Info("Missing SubnetPort", "id", subnetPort.UID)
return
}
if nsxSubnetPort.ExternalAddressBinding == nil || nsxSubnetPort.ExternalAddressBinding.ExternalIpAddress == nil {
return
if e == nil {
e = fmt.Errorf("missing SubnetPort")
}
} else if nsxSubnetPort.ExternalAddressBinding != nil && nsxSubnetPort.ExternalAddressBinding.ExternalIpAddress != nil {
ipAddress = *nsxSubnetPort.ExternalAddressBinding.ExternalIpAddress
}
ab := r.SubnetPortService.GetAddressBindingBySubnetPort(subnetPort)
if ab == nil {
log.Info("Missing AddressBinding for SubnetPort", "namespace", subnetPort.Namespace, "name", subnetPort.Name)
log.Info("No AddressBinding for SubnetPort", "namespace", subnetPort.Namespace, "name", subnetPort.Name)
return
}
if ab.Status.IPAddress != *nsxSubnetPort.ExternalAddressBinding.ExternalIpAddress {
r.setAddressBindingStatus(ctx, ab, transitionTime, e, ipAddress)
}

func (r *SubnetPortReconciler) setAddressBindingStatus(ctx context.Context, ab *v1alpha1.AddressBinding, transitionTime metav1.Time, e error, ipAddress string) {
newConditions := newReadyCondition("AddressBinding", transitionTime, e)
isUpdated := false
for i := range newConditions {
conditionUpdated := false
ab.Status.Conditions, conditionUpdated = mergeCondition(ab.Status.Conditions, &newConditions[i])
isUpdated = isUpdated || conditionUpdated
}
if ab.Status.IPAddress != ipAddress {
isUpdated = true
}
if isUpdated {
ab = ab.DeepCopy()
ab.Status.IPAddress = *nsxSubnetPort.ExternalAddressBinding.ExternalIpAddress
ab.Status.IPAddress = ipAddress
r.Client.Status().Update(ctx, ab)
log.V(1).Info("Updated AddressBinding CR status", "namespace", ab.Namespace, "name", ab.Name, "status", ab.Status)
}
}

func newReadyCondition(resourceType string, transitionTime metav1.Time, e error) []v1alpha1.Condition {
if e == nil {
return []v1alpha1.Condition{
{
Type: v1alpha1.Ready,
Status: v1.ConditionTrue,
Message: fmt.Sprintf("%s has been successfully created/updated", resourceType),
Reason: fmt.Sprintf("%sReady", resourceType),
LastTransitionTime: transitionTime,
},
}
}
return []v1alpha1.Condition{
{
Type: v1alpha1.Ready,
Status: v1.ConditionFalse,
Message: fmt.Sprintf("error occurred while processing the %s CR. Error: %v", resourceType, e),
Reason: fmt.Sprintf("%sNotReady", resourceType),
LastTransitionTime: transitionTime,
},
}
}

func mergeCondition(existingConditions []v1alpha1.Condition, newCondition *v1alpha1.Condition) ([]v1alpha1.Condition, bool) {
matchedCondition := getExistingConditionOfType(newCondition.Type, existingConditions)

if reflect.DeepEqual(matchedCondition, newCondition) {
log.V(2).Info("conditions already match", "New Condition", newCondition, "Existing Condition", matchedCondition)
return existingConditions, false
}

if matchedCondition != nil {
matchedCondition.Reason = newCondition.Reason
matchedCondition.Message = newCondition.Message
matchedCondition.Status = newCondition.Status
} else {
existingConditions = append(existingConditions, *newCondition)
}
return existingConditions, true
}
Loading

0 comments on commit d38142b

Please sign in to comment.