Skip to content

Commit

Permalink
Implement image fetcher feature
Browse files Browse the repository at this point in the history
Sync nsx.vmware.com/node-id annotation of subnetport with nsx-t.
Sync wcp.vmware.com/image-fetcher: "true" and namespace_uid label of subnetport with tags of nsx-t.

Signed-off-by: Xie Zheng <[email protected]>
  • Loading branch information
zhengxiexie committed Aug 5, 2024
1 parent 6e7c54a commit 6686815
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func main() {

node.StartNodeController(mgr, nodeService)
staticroutecontroller.StartStaticRouteController(mgr, staticRouteService)
subnetport.StartSubnetPortController(mgr, subnetPortService, subnetService, vpcService)
subnetport.StartSubnetPortController(mgr, subnetPortService, subnetService, vpcService, nodeService)
pod.StartPodController(mgr, subnetPortService, subnetService, vpcService, nodeService)
StartIPPoolController(mgr, ipPoolService, vpcService)
StartIPAddressAllocationController(mgr, ipAddressAllocationService, vpcService)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if err != nil {
return common.ResultRequeue, err
}
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels)
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnet, contextID, &pod.ObjectMeta.Labels, false)
if err != nil {
log.Error(err, "failed to create or update NSX subnet port, would retry exponentially", "pod.Name", req.NamespacedName, "pod.UID", pod.UID)
updateFail(r, &ctx, pod, &err)
Expand Down
26 changes: 24 additions & 2 deletions pkg/controllers/subnetport/subnetport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type SubnetPortReconciler struct {
SubnetService servicecommon.SubnetServiceProvider
VPCService servicecommon.VPCServiceProvider
Recorder record.EventRecorder
NodeServiceReader servicecommon.NodeServiceReader
}

// +kubebuilder:rbac:groups=nsx.vmware.com,resources=subnetports,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -107,7 +108,27 @@ func (r *SubnetPortReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
return common.ResultRequeue, err
}
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, "", labels)

isVmSubnetPort := true
if subnetPort.Labels[servicecommon.LabelImageFetcher] == "true" {
isVmSubnetPort = false
if labels == nil {
labels = &map[string]string{}
}
(*labels)[servicecommon.LabelImageFetcher] = "true"
}
// specified by user, or default to the node name of the VM the pod runs on
hostname := subnetPort.Annotations[servicecommon.AnnotationHostName]
contextID := ""
if hostname != "" {
nodes := r.NodeServiceReader.GetNodeByName(hostname)
if len(nodes) == 0 {
return common.ResultRequeue, fmt.Errorf("node %s not found", hostname)
}
contextID = *nodes[0].Id
}

nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(subnetPort, nsxSubnet, contextID, labels, isVmSubnetPort)
if err != nil {
log.Error(err, "failed to create or update NSX subnet port, would retry exponentially", "subnetport", req.NamespacedName)
updateFail(r, &ctx, subnetPort, &err)
Expand Down Expand Up @@ -213,14 +234,15 @@ func (r *SubnetPortReconciler) vmMapFunc(_ context.Context, vm client.Object) []
return requests
}

func StartSubnetPortController(mgr ctrl.Manager, subnetPortService *subnetport.SubnetPortService, subnetService *subnet.SubnetService, vpcService *vpc.VPCService) {
func StartSubnetPortController(mgr ctrl.Manager, subnetPortService *subnetport.SubnetPortService, subnetService *subnet.SubnetService, vpcService *vpc.VPCService, nodeService servicecommon.NodeServiceReader) {
subnetPortReconciler := SubnetPortReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
SubnetService: subnetService,
SubnetPortService: subnetPortService,
VPCService: vpcService,
Recorder: mgr.GetEventRecorderFor("subnetport-controller"),
NodeServiceReader: nodeService,
}
if err := subnetPortReconciler.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "SubnetPort")
Expand Down
2 changes: 2 additions & 0 deletions pkg/nsx/services/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
TagScopeVMNamespaceUID string = "nsx-op/vm_namespace_uid"
TagScopeVMNamespace string = "nsx-op/vm_namespace"
LabelDefaultSubnetSet string = "nsxoperator.vmware.com/default-subnetset-for"
LabelImageFetcher string = "wcp.vmware.com/image-fetcher"
AnnotationHostName string = "kubernetes.io/hostname"
LabelDefaultVMSubnetSet string = "VirtualMachine"
LabelDefaultPodSubnetSet string = "Pod"
LabelLbIngressIpMode string = "nsx.vmware.com/ingress-ip-mode"
Expand Down
19 changes: 15 additions & 4 deletions pkg/nsx/services/subnetport/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
String = common.String
)

func (service *SubnetPortService) buildSubnetPort(obj interface{}, nsxSubnet *model.VpcSubnet, contextID string, labelTags *map[string]string) (*model.VpcSubnetPort, error) {
func (service *SubnetPortService) buildSubnetPort(obj interface{}, nsxSubnet *model.VpcSubnet, contextID string, labelTags *map[string]string, isVmSubnetPort bool) (*model.VpcSubnetPort, error) {
var objName, objNamespace, uid, appId, allocateAddresses string
switch o := obj.(type) {
case *v1alpha1.SubnetPort:
Expand Down Expand Up @@ -58,9 +58,20 @@ func (service *SubnetPortService) buildSubnetPort(obj interface{}, nsxSubnet *mo
}
namespace_uid := namespace.UID
tags := util.BuildBasicTags(getCluster(service), obj, namespace_uid)
var tagsFiltered []model.Tag
for _, tag := range tags {
if isVmSubnetPort && *tag.Scope == common.TagScopeNamespaceUID {
continue
}
if !isVmSubnetPort && *tag.Scope == common.TagScopeVMNamespaceUID {
continue
}
tagsFiltered = append(tagsFiltered, tag)
}

if labelTags != nil {
for k, v := range *labelTags {
tags = append(tags, model.Tag{Scope: String(k), Tag: String(v)})
tagsFiltered = append(tagsFiltered, model.Tag{Scope: String(k), Tag: String(v)})
}
}
nsxSubnetPort := &model.VpcSubnetPort{
Expand All @@ -72,14 +83,14 @@ func (service *SubnetPortService) buildSubnetPort(obj interface{}, nsxSubnet *mo
TrafficTag: common.Int64(0),
Type_: String("STATIC"),
},
Tags: tags,
Tags: tagsFiltered,
Path: &nsxSubnetPortPath,
ParentPath: nsxSubnet.Path,
}
if appId != "" {
nsxSubnetPort.Attachment.AppId = &appId
nsxSubnetPort.Attachment.ContextId = &contextID
}
nsxSubnetPort.Attachment.ContextId = &contextID
return nsxSubnetPort, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/nsx/services/subnetport/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestBuildSubnetPort(t *testing.T) {
Type_: common.String("STATIC"),
Id: common.String("32636365-6333-4239-ad37-3534362d3466"),
TrafficTag: common.Int64(0),
ContextId: common.String("fake_context_id"),
},
},
expectedError: nil,
Expand All @@ -112,7 +113,7 @@ func TestBuildSubnetPort(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
observedPort, err := service.buildSubnetPort(tt.obj, tt.nsxSubnet, tt.contextID, tt.labelTags)
observedPort, err := service.buildSubnetPort(tt.obj, tt.nsxSubnet, tt.contextID, tt.labelTags, false)
assert.Equal(t, tt.expectedPort, observedPort)
assert.Equal(t, common.CompareResource(SubnetPortToComparable(tt.expectedPort), SubnetPortToComparable(observedPort)), false)
assert.Equal(t, tt.expectedError, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/nsx/services/subnetport/subnetport.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func InitializeSubnetPort(service servicecommon.Service) (*SubnetPortService, er
return subnetPortService, nil
}

func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxSubnet *model.VpcSubnet, contextID string, tags *map[string]string) (*model.SegmentPortState, error) {
func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxSubnet *model.VpcSubnet, contextID string, tags *map[string]string, isVmSubnetPort bool) (*model.SegmentPortState, error) {
var uid string
switch o := obj.(type) {
case *v1alpha1.SubnetPort:
Expand All @@ -84,7 +84,7 @@ func (service *SubnetPortService) CreateOrUpdateSubnetPort(obj interface{}, nsxS
uid = string(o.UID)
}
log.Info("creating or updating subnetport", "nsxSubnetPort.Id", uid, "nsxSubnetPath", *nsxSubnet.Path)
nsxSubnetPort, err := service.buildSubnetPort(obj, nsxSubnet, contextID, tags)
nsxSubnetPort, err := service.buildSubnetPort(obj, nsxSubnet, contextID, tags, isVmSubnetPort)
if err != nil {
log.Error(err, "failed to build NSX subnet port", "nsxSubnetPort.Id", uid, "*nsxSubnet.Path", *nsxSubnet.Path, "contextID", contextID)
return nil, err
Expand Down
13 changes: 5 additions & 8 deletions pkg/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ func BuildBasicTags(cluster string, obj interface{}, namespaceID types.UID) []mo
Tag: String(strings.Join(common.TagValueVersion, ".")),
},
}
isVmSubnetPort := false
switch i := obj.(type) {
case *v1alpha1.StaticRoute:
tags = append(tags, model.Tag{Scope: String(common.TagScopeNamespace), Tag: String(i.ObjectMeta.Namespace)})
Expand All @@ -465,9 +464,11 @@ func BuildBasicTags(cluster string, obj interface{}, namespaceID types.UID) []mo
tags = append(tags, model.Tag{Scope: String(common.TagScopeSubnetSetCRUID), Tag: String(string(i.UID))})
case *v1alpha1.SubnetPort:
tags = append(tags, model.Tag{Scope: String(common.TagScopeVMNamespace), Tag: String(i.ObjectMeta.Namespace)})
isVmSubnetPort = true
tags = append(tags, model.Tag{Scope: String(common.TagScopeSubnetPortCRName), Tag: String(i.ObjectMeta.Name)})
tags = append(tags, model.Tag{Scope: String(common.TagScopeSubnetPortCRUID), Tag: String(string(i.UID))})
if len(namespaceID) > 0 {
tags = append(tags, model.Tag{Scope: String(common.TagScopeVMNamespaceUID), Tag: String(string(namespaceID))})
}
case *v1.Pod:
tags = append(tags, model.Tag{Scope: String(common.TagScopeNamespace), Tag: String(i.ObjectMeta.Namespace)})
tags = append(tags, model.Tag{Scope: String(common.TagScopePodName), Tag: String(i.ObjectMeta.Name)})
Expand All @@ -487,13 +488,9 @@ func BuildBasicTags(cluster string, obj interface{}, namespaceID types.UID) []mo
}

if len(namespaceID) > 0 {
if isVmSubnetPort == true {
// In the NSX subnet port created for VM, the namespace uid tag is TagScopeVMNamespaceUID instead of TagScopeNamespaceUID.
tags = append(tags, model.Tag{Scope: String(common.TagScopeVMNamespaceUID), Tag: String(string(namespaceID))})
} else {
tags = append(tags, model.Tag{Scope: String(common.TagScopeNamespaceUID), Tag: String(string(namespaceID))})
}
tags = append(tags, model.Tag{Scope: String(common.TagScopeNamespaceUID), Tag: String(string(namespaceID))})
}

return tags
}

Expand Down

0 comments on commit 6686815

Please sign in to comment.