Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Subnet lock for SubnetPort creation #974

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
defer unlockSubnetSet(subnetSet.GetUID(), subnetSetLock)
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
// NSX reserves 4 ip addresses in each subnet for network address, gateway address,
// dhcp server address and broadcast address.
if portNums < totalIP-4 {
if subnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
return *nsxSubnet.Path, nil
}
}
Expand All @@ -56,7 +55,12 @@ func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servi
log.Error(err, "Failed to allocate Subnet")
return "", err
}
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
path, err := subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
if err != nil {
return path, err
}
subnetPortService.AddPortToSubnet(path, 0)
return path, nil
}

func getSharedNamespaceForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, error) {
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,21 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
// The error at the very beginning of the operator startup is expected because at that time the node may be not cached yet. We can expect the retry to become normal.
log.Error(err, "failed to get node ID for pod", "pod.Name", req.NamespacedName, "pod.UID", pod.UID, "node", pod.Spec.NodeName)
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
contextID := *node.UniqueId
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
return common.ResultRequeue, err
}
_, err = r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, pod, err, "", nil)
return common.ResultRequeue, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestPodReconciler_Reconcile(t *testing.T) {
},
},
},
SubnetPortStore: &subnetport.SubnetPortStore{},
},
SubnetService: &subnet.SubnetService{
SubnetStore: &subnet.SubnetStore{},
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/subnet/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (r *SubnetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet) error {
log.Error(err, "Failed to delete Subnet", "ID", *nsxSubnet.Id)
return err
}
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
log.Info("Successfully deleted Subnet", "ID", *nsxSubnet.Id)
}
log.Info("Successfully cleaned Subnets", "subnetCount", len(nsxSubnets))
Expand Down
16 changes: 14 additions & 2 deletions pkg/controllers/subnet/subnet_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
tags2 := []model.Tag{{Scope: common.String(common.TagScopeSubnetCRUID), Tag: common.String("fake-id2")}}
var nsxSubnets []*model.VpcSubnet
id1 := "fake-id1"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id1, Tags: tags1, Path: common.String("fake-path")})
id2 := "fake-id2"
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2})
nsxSubnets = append(nsxSubnets, &model.VpcSubnet{Id: &id2, Tags: tags2, Path: common.String("fake-path")})
return nsxSubnets
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
Expand All @@ -64,6 +64,9 @@ func TestSubnetReconciler_GarbageCollector(t *testing.T) {
patch.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patch.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patch
},
},
Expand Down Expand Up @@ -272,6 +275,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down Expand Up @@ -343,6 +349,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultRequeue,
Expand Down Expand Up @@ -418,6 +427,9 @@ func TestSubnetReconciler_Reconcile(t *testing.T) {
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "DeletePortCount", func(_ *subnetport.SubnetPortService, _ string) {
return
})
return patches
},
expectRes: ResultNormal,
Expand Down
22 changes: 17 additions & 5 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,23 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
labels, err := r.getLabelsFromVirtualMachine(ctx, subnetPort)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "Failed to get labels from VirtualMachine", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
// There is a race condition that the subnetset controller may delete the
// subnet during CollectGarbage. So check the subnet under lock.
lock := r.SubnetService.RLockSubnet(&nsxSubnetPath)
defer r.SubnetService.RUnlockSubnet(&nsxSubnetPath, lock)

nsxSubnet, err := r.SubnetService.GetSubnetByPath(nsxSubnetPath)
if err != nil {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, fmt.Sprintf("Failed to get Subnet by path: %s", nsxSubnetPath), setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, "", labels)
if err != nil {
// Remove SubnetPort from Subnet if the SubnetPort is not saved to store
if subnetport.IsNotCreatedError(err) {
r.SubnetPortService.RemovePortFromSubnet(nsxSubnetPath)
}
r.StatusUpdater.UpdateFail(ctx, subnetPort, err, "", setSubnetPortReadyStatusFalse, r.SubnetPortService)
return common.ResultRequeue, err
}
Expand Down Expand Up @@ -469,7 +471,17 @@ func (r *SubnetPortReconciler) CheckAndGetSubnetPathForSubnetPort(ctx context.Co
log.Error(err, "failed to get NSX subnet by subnet CR UID", "subnetList", subnetList)
return
}
subnetPath = *subnetList[0].Path
nsxSubnet := subnetList[0]
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to use totalIP as a field of CountWithMark which is a one-time set value when we create the object for a new subnet, then we can consume it any time we need to allocate a new port with the diff.

if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
subnetPath = *nsxSubnet.Path
if !r.SubnetPortService.AddPortToSubnet(*nsxSubnet.Path, totalIP-4) {
err = fmt.Errorf("no valid IP in Subnet %s", subnetPath)
return
}
} else if len(subnetPort.Spec.SubnetSet) > 0 {
subnetSet := &v1alpha1.SubnetSet{}
namespacedName := types.NamespacedName{
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/subnetport/subnetport_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,13 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) {
k8sClient := mock_client.NewMockClient(mockCtl)
defer mockCtl.Finish()
r := &SubnetPortReconciler{
Client: k8sClient,
SubnetPortService: &subnetport.SubnetPortService{},
SubnetService: &subnet.SubnetService{},
Client: k8sClient,
SubnetPortService: &subnetport.SubnetPortService{
SubnetPortStore: &subnetport.SubnetPortStore{
PortCountMap: make(map[string]*subnetport.CountWithMark),
},
},
SubnetService: &subnet.SubnetService{},
}

tests := []struct {
Expand Down Expand Up @@ -725,7 +729,8 @@ func TestSubnetPortReconciler_CheckAndGetSubnetPathForSubnetPort(t *testing.T) {
patches.ApplyFunc((*subnet.SubnetService).GetSubnetsByIndex,
func(s *subnet.SubnetService, key string, value string) []*model.VpcSubnet {
return []*model.VpcSubnet{{
Path: servicecommon.String("subnet-path-1"),
Path: servicecommon.String("subnet-path-1"),
Ipv4SubnetSize: servicecommon.Int64(16),
}}
})
return patches
Expand Down
40 changes: 19 additions & 21 deletions pkg/controllers/subnetset/subnetset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,32 +375,30 @@ func (r *SubnetSetReconciler) deleteSubnets(nsxSubnets []*model.VpcSubnet, delet
}
var deleteErrs []error
for _, nsxSubnet := range nsxSubnets {
lock := r.SubnetService.LockSubnet(nsxSubnet.Path)
func() {
defer r.SubnetService.UnlockSubnet(nsxSubnet.Path, lock)

portNums := len(r.SubnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
if portNums > 0 {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
return
}

if deleteBindingMaps {
if err := r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
return
}
}
if !r.SubnetPortService.IsEmptySubnet(*nsxSubnet.Path) {
hasStalePort = true
log.Info("Skipped deleting NSX Subnet due to stale ports", "nsxSubnet", *nsxSubnet.Id)
continue
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
if deleteBindingMaps {
if err = r.BindingService.DeleteSubnetConnectionBindingMapsByParentSubnet(nsxSubnet); err != nil {
deleteErr := fmt.Errorf("failed to delete NSX SubnetConnectionBindingMaps connected to NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
continue
}
}()
}

if err := r.SubnetService.DeleteSubnet(*nsxSubnet); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a race that when this routine is deleting the subnet, and pod_controller/subnetport_controller is using the subnet to allocate a new port?

deleteErr := fmt.Errorf("failed to delete NSX Subnet/%s: %+v", *nsxSubnet.Id, err)
deleteErrs = append(deleteErrs, deleteErr)
log.Error(deleteErr, "Skipping to next Subnet")
} else {
r.SubnetPortService.DeletePortCount(*nsxSubnet.Path)
}

}
if len(deleteErrs) > 0 {
err = fmt.Errorf("multiple errors occurred while deleting Subnets: %v", deleteErrs)
Expand Down
11 changes: 3 additions & 8 deletions pkg/controllers/subnetset/subnetset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func createFakeSubnetSetReconciler(objs []client.Object) *SubnetSetReconciler {
Client: nil,
NSXClient: &nsx.Client{},
},
SubnetPortStore: nil,
SubnetPortStore: &subnetport.SubnetPortStore{},
}

return &SubnetSetReconciler{
Expand Down Expand Up @@ -396,13 +396,8 @@ func TestReconcile_DeleteSubnetSet(t *testing.T) {
return nil
})

patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "GetPortsOfSubnet", func(_ *subnetport.SubnetPortService, _ string) (ports []*model.VpcSubnetPort) {
id := "fake-subnetport-0"
return []*model.VpcSubnetPort{
{
Id: &id,
},
}
patches.ApplyMethod(reflect.TypeOf(r.SubnetPortService), "IsEmptySubnet", func(_ *subnetport.SubnetPortService, _ string) bool {
return false
})
patches.ApplyMethod(reflect.TypeOf(r.SubnetService), "DeleteSubnet", func(_ *subnet.SubnetService, subnet model.VpcSubnet) error {
return nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/mock/services_mock.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package mock

import (
"sync"

"github.com/stretchr/testify/mock"
"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -81,26 +79,26 @@ func (m *MockSubnetServiceProvider) GenerateSubnetNSTags(obj client.Object) []mo
return []model.Tag{}
}

func (m *MockSubnetServiceProvider) LockSubnet(path *string) *sync.RWMutex {
return nil
type MockSubnetPortServiceProvider struct {
mock.Mock
}

func (m *MockSubnetServiceProvider) UnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
return
}

func (m *MockSubnetServiceProvider) RLockSubnet(path *string) *sync.RWMutex {
return nil
func (m *MockSubnetPortServiceProvider) AddPortToSubnet(path string, size int) bool {
return true
}

func (m *MockSubnetServiceProvider) RUnlockSubnet(path *string, lock *sync.RWMutex) {
func (m *MockSubnetPortServiceProvider) RemovePortFromSubnet(path string) {
return
}

type MockSubnetPortServiceProvider struct {
mock.Mock
func (m *MockSubnetPortServiceProvider) IsEmptySubnet(path string) bool {
return true
}

func (m *MockSubnetPortServiceProvider) GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort) {
func (m *MockSubnetPortServiceProvider) DeletePortCount(path string) {
return
}
9 changes: 4 additions & 5 deletions pkg/nsx/services/common/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package common

import (
"context"
"sync"

"github.com/vmware/vsphere-automation-sdk-go/services/nsxt/model"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,14 +30,14 @@ type SubnetServiceProvider interface {
GetSubnetsByIndex(key, value string) []*model.VpcSubnet
CreateOrUpdateSubnet(obj client.Object, vpcInfo VPCResourceInfo, tags []model.Tag) (string, error)
GenerateSubnetNSTags(obj client.Object) []model.Tag
LockSubnet(path *string) *sync.RWMutex
UnlockSubnet(path *string, lock *sync.RWMutex)
RLockSubnet(path *string) *sync.RWMutex
RUnlockSubnet(path *string, lock *sync.RWMutex)
}

type SubnetPortServiceProvider interface {
GetPortsOfSubnet(nsxSubnetID string) (ports []*model.VpcSubnetPort)
AddPortToSubnet(path string, size int) bool
RemovePortFromSubnet(path string)
IsEmptySubnet(path string) bool
DeletePortCount(path string)
}

type NodeServiceReader interface {
Expand Down
Loading
Loading