Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential race condition of InitializeSubnetService #879

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
bin/
.DS_Store
go.work
go.work.sum
go.work.sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed?

.tool-versions
27 changes: 23 additions & 4 deletions pkg/nsx/services/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,39 @@ func InitializeSubnetService(service common.Service) (*SubnetService, error) {
},
}

// Use sync.Once to ensure channel is closed only once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to change like this,

func InitializeSubnetService(service common.Service) (*SubnetService, error) {
	wg := sync.WaitGroup{}
	fatalErrors := make(chan error, 1)
	defer close(fatalErrors)
	subnetService := &SubnetService{
		Service: service,
		SubnetStore: &SubnetStore{
			ResourceStore: common.ResourceStore{
				Indexer: cache.NewIndexer(keyFunc, cache.Indexers{
					common.TagScopeSubnetCRUID:    subnetIndexFunc,
					common.TagScopeSubnetSetCRUID: subnetSetIndexFunc,
					common.TagScopeVMNamespace:    subnetIndexVMNamespaceFunc,
					common.TagScopeNamespace:      subnetIndexNamespaceFunc,
				}),
				BindingType: model.VpcSubnetBindingType(),
			},
		},
	}

	wg.Add(1)
	go subnetService.InitializeResourceStore(&wg, fatalErrors, ResourceTypeSubnet, nil, subnetService.SubnetStore)
	wg.Wait()

	if len(fatalErrors) > 0 {
		err := <-fatalErrors
		return subnetService, err
	}

	return subnetService, nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/vmware-tanzu/nsx-operator/blob/main/pkg/nsx/services/securitypolicy/firewall.go#L116-L116
The initial reason is to return as early as possible if there are multiple resources.
You should consider closing channel fatalErrors in this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fatalErrors is closed in "defer" after it is declared in my example.
Even if there are multiple resources, we still need to wait until all sub-tasks are done rather than leave them out of control, otherwise it may lead to unexpected issues (e.g., a zombie routine).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this version has no zombie routine.

var closeOnceFatalError sync.Once
var closeOnceWgDone sync.Once
safeCloseFatalError := func(ce chan error) {
closeOnceFatalError.Do(func() {
close(ce)
})
}
safeCloseWgDone := func(ce chan bool) {
closeOnceWgDone.Do(func() {
close(ce)
})
}

wg.Add(1)
go subnetService.InitializeResourceStore(&wg, fatalErrors, ResourceTypeSubnet, nil, subnetService.SubnetStore)
go func() {
wg.Wait()
close(wgDone)
safeCloseWgDone(wgDone)
}()
select {
case <-wgDone:
break
safeCloseFatalError(fatalErrors) // Clean up fatalErrors channel
return subnetService, nil
case err := <-fatalErrors:
close(fatalErrors)
// Wait for any pending operations to complete
go func() {
wg.Wait() // Ensure all goroutines complete
safeCloseWgDone(wgDone)
}()
safeCloseFatalError(fatalErrors) // Clean up fatalErrors channel
return subnetService, err
}
return subnetService, nil
}

func (service *SubnetService) CreateOrUpdateSubnet(obj client.Object, vpcInfo common.VPCResourceInfo, tags []model.Tag) (string, error) {
Expand Down
Loading