Skip to content

Commit

Permalink
Feat add shim uninstall (#92)
Browse files Browse the repository at this point in the history
add shim delete logic
  • Loading branch information
voigt authored Feb 14, 2024
1 parent 4b7c0f5 commit e4dbc82
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ linters-settings:
- "client.IgnoreNotFound("
- "fmt.Errorf("
cyclop:
max-complexity: 13
max-complexity: 16
nestif:
min-complexity: 8
depguard:
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions docs/shim_delete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Deleting a Shim


- if a shim is deleted, an "uninstall"-job is scheduled on every node that matched the shim's selector


What happens if an uninstall job does not complete (successfully)?

- the deletion of the shim must not be blocked by uninstall-jobs
- node should still be annotated with "uninstall" or "failed"
32 changes: 25 additions & 7 deletions internal/controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ func (jr *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
switch finishedType {
case "": // ongoing
log.Info().Msgf("Job %s is still Ongoing", job.Name)
// if err := jr.updateNodeLabels(ctx, node, shimName, "pending"); err != nil {
// log.Error().Msgf("Unable to update node label %s: %s", shimName, err)
// }
return ctrl.Result{}, nil
case batchv1.JobFailed:
log.Info().Msgf("Job %s is still failing...", job.Name)
Expand All @@ -106,11 +103,22 @@ func (jr *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}
return ctrl.Result{}, nil
case batchv1.JobComplete:
log.Info().Msgf("Job %s is Completed. Happy WASMing", job.Name)
if err := jr.updateNodeLabels(ctx, node, shimName, "provisioned"); err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shimName, err)
log.Info().Msgf("Job %s is Completed.", job.Name)

installOrUninstall := job.Annotations["kwasm.sh/operation"]

switch installOrUninstall {
case INSTALL:
if err := jr.updateNodeLabels(ctx, node, shimName, "provisioned"); err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shimName, err)
}
case UNINSTALL:
if err := jr.deleteNodeLabel(ctx, node, shimName); err != nil {
log.Error().Msgf("Unable to delete node label %s: %s", shimName, err)
}
}
return ctrl.Result{}, nil

return ctrl.Result{}, err
case batchv1.JobSuspended:
log.Info().Msgf("Job %s is suspended", job.Name)
return ctrl.Result{}, nil
Expand All @@ -129,6 +137,16 @@ func (jr *JobReconciler) updateNodeLabels(ctx context.Context, node *corev1.Node
return nil
}

func (jr *JobReconciler) deleteNodeLabel(ctx context.Context, node *corev1.Node, shimName string) error {
delete(node.Labels, shimName)

if err := jr.Update(ctx, node); err != nil {
return fmt.Errorf("failed to delete node labels: %w", err)
}

return nil
}

func (jr *JobReconciler) getNode(ctx context.Context, nodeName string) (*corev1.Node, error) {
node := corev1.Node{}
if err := jr.Client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
Expand Down
145 changes: 78 additions & 67 deletions internal/controller/shim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (

const (
KwasmOperatorFinalizer = "kwasm.sh/finalizer"
// addKWasmNodeLabelAnnotation = "kwasm.sh/"
// nodeNameLabel = "kwasm.sh/"
INSTALL = "install"
UNINSTALL = "uninstall"
)

// ShimReconciler reconciles a Shim object
Expand Down Expand Up @@ -112,12 +112,12 @@ func (sr *ShimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// Shim has been requested for deletion, delete the child resources
if !shimResource.DeletionTimestamp.IsZero() {
log.Debug().Msgf("Deleting shim %s", shimResource.Name)
err := sr.handleDeletion(ctx, &shimResource)
err := sr.handleDeleteShim(ctx, &shimResource, nodes)
if err != nil {
return ctrl.Result{}, err
}

err = sr.removeFinalizer(ctx, &shimResource)
err = sr.removeFinalizerFromShim(ctx, &shimResource)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand All @@ -128,23 +128,23 @@ func (sr *ShimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}
if !rcExists {
log.Info().Msgf("RuntimeClass '%s' not found", shimResource.Spec.RuntimeClass.Name)
_, err = sr.handleDeployRuntmeClass(ctx, &shimResource)
_, err = sr.handleDeployRuntimeClass(ctx, &shimResource)
if err != nil {
return ctrl.Result{}, err
}
}

// 4. Deploy job to each node in list
if len(nodes.Items) != 0 {
_, err = sr.handleDeployJob(ctx, &shimResource, nodes)
if len(nodes.Items) > 0 {
_, err = sr.handleInstallShim(ctx, &shimResource, nodes)
if err != nil {
return ctrl.Result{}, err
}
} else {
log.Info().Msg("No nodes found")
}

err = sr.ensureFinalizer(ctx, &shimResource)
err = sr.ensureFinalizerForShim(ctx, &shimResource, KwasmOperatorFinalizer)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -204,8 +204,8 @@ func (sr *ShimReconciler) updateStatus(ctx context.Context, shim *kwasmv1.Shim,
return nil
}

// handleDeployJob deploys a Job to each node in a list.
func (sr *ShimReconciler) handleDeployJob(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) (ctrl.Result, error) {
// handleInstallShim deploys a Job to each node in a list.
func (sr *ShimReconciler) handleInstallShim(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) (ctrl.Result, error) {
log := log.Ctx(ctx)

switch shim.Spec.RolloutStrategy.Type {
Expand All @@ -222,7 +222,7 @@ func (sr *ShimReconciler) handleDeployJob(ctx context.Context, shim *kwasmv1.Shi
shimProvisioned := node.Labels[shim.Name] == "provisioned"
shimPending := node.Labels[shim.Name] == "pending"
if !shimProvisioned && !shimPending {
err := sr.deployJobOnNode(ctx, shim, node)
err := sr.deployJobOnNode(ctx, shim, node, INSTALL)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -240,19 +240,42 @@ func (sr *ShimReconciler) handleDeployJob(ctx context.Context, shim *kwasmv1.Shi
return ctrl.Result{}, nil
}

// deployJobOnNode deploys a Job to a Node.
func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *kwasmv1.Shim, node corev1.Node) error {
// deployUninstallJob deploys an uninstall Job for a Shim.
func (sr *ShimReconciler) deployJobOnNode(ctx context.Context, shim *kwasmv1.Shim, node corev1.Node, jobType string) error {
log := log.Ctx(ctx)

log.Info().Msgf("Deploying Shim %s on node: %s", shim.Name, node.Name)

if err := sr.updateNodeLabels(ctx, &node, shim, "pending"); err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shim.Name, err)
if err := sr.Client.Get(ctx, types.NamespacedName{Name: node.Name}, &node); err != nil {
log.Error().Msgf("Unable to re-fetch node: %s", err)
return fmt.Errorf("failed to fetch node: %w", err)
}

job, err := sr.createJobManifest(shim, &node)
if err != nil {
return err
log.Info().Msgf("Deploying %s-Job for Shim %s on node: %s", jobType, shim.Name, node.Name)

var job *batchv1.Job

switch jobType {
case INSTALL:
err := sr.updateNodeLabels(ctx, &node, shim, "pending")
if err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shim.Name, err)
}

job, err = sr.createJobManifest(shim, &node, INSTALL)
if err != nil {
return err
}
case UNINSTALL:
err := sr.updateNodeLabels(ctx, &node, shim, UNINSTALL)
if err != nil {
log.Error().Msgf("Unable to update node label %s: %s", shim.Name, err)
}

job, err = sr.createJobManifest(shim, &node, UNINSTALL)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid jobType: %s", jobType)
}

// We want to use server-side apply https://kubernetes.io/docs/reference/using-api/server-side-apply
Expand Down Expand Up @@ -285,9 +308,9 @@ func (sr *ShimReconciler) updateNodeLabels(ctx context.Context, node *corev1.Nod
}

// createJobManifest creates a Job manifest for a Shim.
func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Node) (*batchv1.Job, error) {
func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Node, operation string) (*batchv1.Job, error) {
priv := true
name := node.Name + "." + shim.Name
name := node.Name + "-" + shim.Name + "-" + operation
nameMax := int(math.Min(float64(len(name)), 63))

job := &batchv1.Job{
Expand All @@ -298,10 +321,16 @@ func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Nod
ObjectMeta: metav1.ObjectMeta{
Name: name[:nameMax],
Namespace: os.Getenv("CONTROLLER_NAMESPACE"),
Annotations: map[string]string{
"kwasm.sh/nodeName": node.Name,
"kwasm.sh/shimName": shim.Name,
"kwasm.sh/operation": operation,
},
Labels: map[string]string{
name[:nameMax]: "true",
"kwasm.sh/shimName": shim.Name,
"kwasm.sh/job": "true",
name[:nameMax]: "true",
"kwasm.sh/shimName": shim.Name,
"kwasm.sh/operation": operation,
"kwasm.sh/job": "true",
},
},
Spec: batchv1.JobSpec{
Expand All @@ -318,7 +347,7 @@ func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Nod
},
}},
Containers: []corev1.Container{{
Image: "voigt/kwasm-node-installer:new",
Image: "voigt/kwasm-node-installer:" + operation,
Name: "provisioner",
SecurityContext: &corev1.SecurityContext{
Privileged: &priv,
Expand Down Expand Up @@ -357,15 +386,18 @@ func (sr *ShimReconciler) createJobManifest(shim *kwasmv1.Shim, node *corev1.Nod
},
},
}
if err := ctrl.SetControllerReference(shim, job, sr.Scheme); err != nil {
return nil, fmt.Errorf("failed to set controller reference: %w", err)

if operation == INSTALL {
if err := ctrl.SetControllerReference(shim, job, sr.Scheme); err != nil {
return nil, fmt.Errorf("failed to set controller reference: %w", err)
}
}

return job, nil
}

// handleDeployRuntmeClass deploys a RuntimeClass for a Shim.
func (sr *ShimReconciler) handleDeployRuntmeClass(ctx context.Context, shim *kwasmv1.Shim) (ctrl.Result, error) {
// handleDeployRuntimeClass deploys a RuntimeClass for a Shim.
func (sr *ShimReconciler) handleDeployRuntimeClass(ctx context.Context, shim *kwasmv1.Shim) (ctrl.Result, error) {
log := log.Ctx(ctx)

log.Info().Msgf("Deploying RuntimeClass: %s", shim.Spec.RuntimeClass.Name)
Expand Down Expand Up @@ -423,42 +455,21 @@ func (sr *ShimReconciler) createRuntimeClassManifest(shim *kwasmv1.Shim) (*nodev
return runtimeClass, nil
}

// handleDeletion deletes all possible child resources of a Shim. It will ignore NotFound errors.
func (sr *ShimReconciler) handleDeletion(ctx context.Context, shim *kwasmv1.Shim) error {
// TODO: deploy uninstall job here
// err := sr.handleUninstall(ctx, shim)
// if client.IgnoreNotFound(err) != nil {
// return err
// }

// remove shim labels from node
err := sr.removeShimLabelsFromNodes(ctx, shim)
if client.IgnoreNotFound(err) != nil {
return err
}

return nil
}

func (sr *ShimReconciler) removeShimLabelsFromNodes(ctx context.Context, shim *kwasmv1.Shim) error {
log := log.Ctx(ctx)

nodes, err := sr.getNodeListFromShimsNodeSelctor(ctx, shim)
if err != nil {
return err
}

// handleDeleteShim deletes all possible child resources of a Shim. It will ignore NotFound errors.
func (sr *ShimReconciler) handleDeleteShim(ctx context.Context, shim *kwasmv1.Shim, nodes *corev1.NodeList) error {
// deploy uninstall job on every node in node list
for i := range nodes.Items {
node := nodes.Items[i]
if _, ok := node.Labels[shim.Name]; ok {
log.Debug().Msgf("Removing label %s from node %s", shim.Name, node.Name)
delete(node.Labels, shim.Name)
if err := sr.Update(ctx, &node); err != nil {
log.Error().Msgf("Unable to remove label %s from node %s: %s", shim.Name, node.Name, err)

if _, exists := node.Labels[shim.Name]; exists {
err := sr.deployJobOnNode(ctx, shim, node, UNINSTALL)
if client.IgnoreNotFound(err) != nil {
return err
}
} else {
log.Info().Msgf("Shim %s has no label on Node %s", shim.Name, node.Name)
}
}

return nil
}

Expand Down Expand Up @@ -507,8 +518,8 @@ func (sr *ShimReconciler) getRuntimeClass(ctx context.Context, shim *kwasmv1.Shi
return &rc, nil
}

// removeFinalizer removes the finalizer from a Shim.
func (sr *ShimReconciler) removeFinalizer(ctx context.Context, shim *kwasmv1.Shim) error {
// removeFinalizerFromShim removes the finalizer from a Shim.
func (sr *ShimReconciler) removeFinalizerFromShim(ctx context.Context, shim *kwasmv1.Shim) error {
if controllerutil.ContainsFinalizer(shim, KwasmOperatorFinalizer) {
controllerutil.RemoveFinalizer(shim, KwasmOperatorFinalizer)
if err := sr.Client.Update(ctx, shim); err != nil {
Expand All @@ -518,10 +529,10 @@ func (sr *ShimReconciler) removeFinalizer(ctx context.Context, shim *kwasmv1.Shi
return nil
}

// ensureFinalizer ensures the finalizer is present on a Shim resource.
func (sr *ShimReconciler) ensureFinalizer(ctx context.Context, shim *kwasmv1.Shim) error {
if !controllerutil.ContainsFinalizer(shim, KwasmOperatorFinalizer) {
controllerutil.AddFinalizer(shim, KwasmOperatorFinalizer)
// ensureFinalizerForShim ensures the finalizer is present on a Shim resource.
func (sr *ShimReconciler) ensureFinalizerForShim(ctx context.Context, shim *kwasmv1.Shim, finalizer string) error {
if !controllerutil.ContainsFinalizer(shim, finalizer) {
controllerutil.AddFinalizer(shim, finalizer)
if err := sr.Client.Update(ctx, shim); err != nil {
return fmt.Errorf("failed to set finalizer: %w", err)
}
Expand Down

0 comments on commit e4dbc82

Please sign in to comment.