Skip to content

Commit

Permalink
Support async create VM
Browse files Browse the repository at this point in the history
This patch introduces an async variant on the provider's
`CreateOrUpdateVirtualMachine` function. This new function,
`CreateOrUpdateVirtualMachineAsync`, returns `<-chan error, error`,
for when a non-blocking create is used.

Please note, support for async create is only enabled when the
async signal feature is enabled. This is because the new async
create workflow no longer:

- Falls into a post-create reconfigure, instead allowing async
  signal to enqueue a new reconcile when the create has completed.

- Requeues the VM after N time based on the VM's state, ex. when
  the VM is powered on and does not yet have an IP address. This
  also now relies on async signal when the feature is enabled.

Finally, while a non-blocking create does mean the reconciler
threads are no longer consumed by create operations, it does not
mean VM Op will allow unbounded, concurrent creates. Because each
non-blocking create operation consumes a goroutine, the number of
concurrent create operations is still limited. The limit is the same
as the number of threads previously allowed to do create operations.
The difference is, with async create disabled, if 16 threads are
doing creates, that is 16 threads that cannot do anything else. With
async create enabled, if 16 goroutines are doing create, there are
still 16 reconciler threads available to do other things.
  • Loading branch information
akutz committed Oct 30, 2024
1 parent 5f28e3b commit 0cf97b1
Show file tree
Hide file tree
Showing 16 changed files with 917 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,28 +316,48 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

if err := r.ReconcileNormal(vmCtx); err != nil {
if err = r.ReconcileNormal(vmCtx); err != nil && !ignoredCreateErr(err) {
vmCtx.Logger.Error(err, "Failed to reconcile VirtualMachine")
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: requeueDelay(vmCtx)}, nil
// Requeue after N amount of time according to the state of the VM.
return ctrl.Result{RequeueAfter: requeueDelay(vmCtx, err)}, nil
}

// Determine if we should request a non-zero requeue delay in order to trigger a non-rate limited reconcile
// at some point in the future. Use this delay-based reconcile to trigger a specific reconcile to discovery the VM IP
// address rather than relying on the resync period to do.
// Determine if we should request a non-zero requeue delay in order to trigger a
// non-rate limited reconcile at some point in the future.
//
// TODO: It would be much preferable to determine that a non-error resync is required at the source of the determination that
// TODO: the VM IP isn't available rather than up here in the reconcile loop. However, in the interest of time, we are making
// TODO: this determination here and will have to refactor at some later date.
func requeueDelay(ctx *pkgctx.VirtualMachineContext) time.Duration {
// If the VM is in Creating phase, the reconciler has run out of threads to Create VMs on the provider. Do not queue
// immediately to avoid exponential backoff.
if !conditions.IsTrue(ctx.VM, vmopv1.VirtualMachineConditionCreated) {
// When async signal is disabled, this is used to trigger a specific reconcile
// to discovery the VM IP address rather than relying on the resync period to
// do.
//
// TODO
// It would be much preferable to determine that a non-error resync is required
// at the source of the determination that the VM IP is not available rather
// than up here in the reconcile loop. However, in the interest of time, we are
// making this determination here and will have to refactor at some later date.
func requeueDelay(
ctx *pkgctx.VirtualMachineContext,
err error) time.Duration {

// If there were too many concurrent create operations or if the VM is in
// Creating phase, the reconciler has run out of threads or goroutines to
// Create VMs on the provider. Do not queue immediately to avoid exponential
// backoff.
if ignoredCreateErr(err) ||
!conditions.IsTrue(ctx.VM, vmopv1.VirtualMachineConditionCreated) {

return pkgcfg.FromContext(ctx).CreateVMRequeueDelay
}

// Do not requeue for the IP address if async signal is enabled.
if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled {

return 0
}

if ctx.VM.Status.PowerState == vmopv1.VirtualMachinePowerStateOn {
networkSpec := ctx.VM.Spec.Network
if networkSpec != nil && !networkSpec.Disabled {
Expand Down Expand Up @@ -439,14 +459,54 @@ func (r *Reconciler) ReconcileNormal(ctx *pkgctx.VirtualMachineContext) (reterr
// Upgrade schema fields where needed
upgradeSchema(ctx)

err := r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
var (
err error
chanErr <-chan error
)

if pkgcfg.FromContext(ctx).Features.WorkloadDomainIsolation &&
!pkgcfg.FromContext(ctx).AsyncSignalDisabled &&
!pkgcfg.FromContext(ctx).AsyncCreateDisabled {
//
// Non-blocking create
//
chanErr, err = r.VMProvider.CreateOrUpdateVirtualMachineAsync(ctx, ctx.VM)
} else {
//
// Blocking create
//
err = r.VMProvider.CreateOrUpdateVirtualMachine(ctx, ctx.VM)
}

switch {
case ctxop.IsCreate(ctx):
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
case ctxop.IsCreate(ctx) && !ignoredCreateErr(err):

if chanErr == nil {
//
// Blocking create
//
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
//
// Non-blocking create
//
if err != nil {
// Failed before goroutine.
r.Recorder.EmitEvent(ctx.VM, "Create", err, false)
} else {
// Emit event once goroutine is complete.
go func(obj client.Object) {
err := <-chanErr
r.Recorder.EmitEvent(obj, "Create", err, false)
}(ctx.VM.DeepCopy())
}
}
case ctxop.IsUpdate(ctx):

r.Recorder.EmitEvent(ctx.VM, "Update", err, false)
case err != nil:

case err != nil && !ignoredCreateErr(err):

// Catch all event for neither create nor update op.
r.Recorder.EmitEvent(ctx.VM, "ReconcileNormal", err, true)
}
Expand All @@ -468,3 +528,8 @@ func getIsDefaultVMClassController(ctx context.Context) bool {
}
return false
}

func ignoredCreateErr(err error) bool {
return err == providers.ErrDuplicateCreate ||
err == providers.ErrTooManyCreates
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
pkgcfg "github.com/vmware-tanzu/vm-operator/pkg/config"
"github.com/vmware-tanzu/vm-operator/pkg/constants/testlabels"
pkgctx "github.com/vmware-tanzu/vm-operator/pkg/context"
"github.com/vmware-tanzu/vm-operator/pkg/providers"
providerfake "github.com/vmware-tanzu/vm-operator/pkg/providers/fake"
"github.com/vmware-tanzu/vm-operator/pkg/util/kube/cource"
"github.com/vmware-tanzu/vm-operator/pkg/util/ptr"
Expand Down Expand Up @@ -105,11 +106,13 @@ func intgTestsReconcile() {
dummyInstanceUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
return nil
},
)
})

AfterEach(func() {
Expand All @@ -128,21 +131,63 @@ func intgTestsReconcile() {
})

It("Reconciles after VirtualMachine creation", func() {
var createAttempts int32

By("Exceed number of allowed concurrent create operations", func() {
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrTooManyCreates
},
)
})

vm.Spec.Network = &vmopv1.VirtualMachineNetworkSpec{}
Expect(ctx.Client.Create(ctx, vm)).To(Succeed())

By("VirtualMachine should have finalizer added", func() {
waitForVirtualMachineFinalizer(ctx, vmKey)
})

Eventually(func(g Gomega) {
g.Expect(atomic.LoadInt32(&createAttempts)).To(BeNumerically(">=", int32(3)))
g.Expect(conditions.IsTrue(vm, vmopv1.VirtualMachineConditionCreated)).To(BeFalse())
}, "5s").Should(
Succeed(),
"waiting for reconcile to be requeued at least three times")

atomic.StoreInt32(&createAttempts, 0)

By("Causing duplicate creates", func() {
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&createAttempts, 1)
return providers.ErrDuplicateCreate
},
)
})

Eventually(func(g Gomega) {
g.Expect(atomic.LoadInt32(&createAttempts)).To(BeNumerically(">=", int32(3)))
g.Expect(conditions.IsTrue(vm, vmopv1.VirtualMachineConditionCreated)).To(BeFalse())
}, "5s").Should(
Succeed(),
"waiting for reconcile to be requeued at least three times")

By("Set InstanceUUID in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Just using InstanceUUID here for a field to update.
vm.Status.InstanceUUID = dummyInstanceUUID
return nil
},
)
})

By("VirtualMachine should have InstanceUUID set", func() {
Expand All @@ -155,13 +200,15 @@ func intgTestsReconcile() {
})

By("Set Created condition in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.PowerState = vmopv1.VirtualMachinePowerStateOn
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)
})

By("VirtualMachine should have Created condition set", func() {
Expand All @@ -174,14 +221,16 @@ func intgTestsReconcile() {
})

By("Set IP address in CreateOrUpdateVM", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.Network = &vmopv1.VirtualMachineNetworkStatus{
PrimaryIP4: dummyIPAddress,
}
return nil
},
)
})

By("VirtualMachine should have IP address set", func() {
Expand Down Expand Up @@ -213,13 +262,15 @@ func intgTestsReconcile() {
biosUUID := uuid.NewString()

BeforeEach(func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
vm.Status.BiosUUID = biosUUID
return nil
},
)
})

// NOTE: mutating webhook sets the default spec.instanceUUID, but is not run in this test -
Expand Down Expand Up @@ -261,13 +312,15 @@ func intgTestsReconcile() {
})

It("Reconciles after VirtualMachineClass change", func() {
intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
// Set this so requeueDelay() returns 0.
conditions.MarkTrue(vm, vmopv1.VirtualMachineConditionCreated)
return nil
},
)

Expect(ctx.Client.Create(ctx, vm)).To(Succeed())
// Wait for initial reconcile.
Expand All @@ -283,12 +336,14 @@ func intgTestsReconcile() {

instanceUUID := uuid.NewString()

intgFakeVMProvider.Lock()
intgFakeVMProvider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
}
intgFakeVMProvider.Unlock()
providerfake.SetCreateOrUpdateFunction(
ctx,
intgFakeVMProvider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
vm.Status.InstanceUUID = instanceUUID
return nil
},
)

vmClass := builder.DummyVirtualMachineClass(vm.Spec.ClassName)
vmClass.Namespace = vm.Namespace
Expand Down Expand Up @@ -390,6 +445,8 @@ var _ = Describe(
ctx = pkgcfg.UpdateContext(
ctx,
func(config *pkgcfg.Config) {
config.AsyncSignalDisabled = false
config.AsyncCreateDisabled = false
config.Features.WorkloadDomainIsolation = true
},
)
Expand All @@ -400,10 +457,14 @@ var _ = Describe(
provider.VSphereClientFn = func(ctx context.Context) (*vsclient.Client, error) {
return vsclient.NewClient(ctx, vcSimCtx.VCClientConfig)
}
provider.CreateOrUpdateVirtualMachineFn = func(ctx context.Context, obj *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
}
providerfake.SetCreateOrUpdateFunction(
ctx,
provider,
func(ctx context.Context, vm *vmopv1.VirtualMachine) error {
atomic.AddInt32(&numCreateOrUpdateCalls, 1)
return nil
},
)

vcSimCtx = builder.NewIntegrationTestContextForVCSim(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var suite = builder.NewTestSuiteForControllerWithContext(
config.SyncPeriod = 60 * time.Minute
config.CreateVMRequeueDelay = 1 * time.Second
config.PoweredOnVMHasIPRequeueDelay = 1 * time.Second
config.AsyncSignalDisabled = true
config.AsyncCreateDisabled = true
})),
virtualmachine.AddToManager,
func(ctx *pkgctx.ControllerManagerContext, _ ctrlmgr.Manager) error {
Expand Down
Loading

0 comments on commit 0cf97b1

Please sign in to comment.