Skip to content

Commit

Permalink
Merge pull request #555 from karlkfi/karl-dep-filter2
Browse files Browse the repository at this point in the history
feat: Add dependency filter
  • Loading branch information
k8s-ci-robot authored Mar 1, 2022
2 parents bd20d62 + 4599e4f commit 5c6134a
Show file tree
Hide file tree
Showing 17 changed files with 3,054 additions and 95 deletions.
19 changes: 14 additions & 5 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
Expand Down Expand Up @@ -125,6 +126,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
}
klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))

// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
// Build list of apply validation filters.
Expand All @@ -135,6 +140,10 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
Inv: invInfo,
InvPolicy: options.InventoryPolicy,
},
filter.DependencyFilter{
TaskContext: taskContext,
Strategy: actuation.ActuationStrategyApply,
},
}
// Build list of prune validation filters.
pruneFilters := []filter.ValidationFilter{
Expand All @@ -146,9 +155,12 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
filter.LocalNamespacesFilter{
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredSetToObjMetadataSet(objects)),
},
filter.DependencyFilter{
TaskContext: taskContext,
Strategy: actuation.ActuationStrategyDelete,
},
}
// Build list of apply mutators.
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: a.client,
Expand Down Expand Up @@ -184,7 +196,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
WithApplyObjects(applyObjs).
WithPruneObjects(pruneObjs).
WithInventory(invInfo).
Build(opts)
Build(taskContext, opts)

klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))
Expand All @@ -206,9 +218,6 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects objec
return
}

// Build a TaskContext for passing info between tasks
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Register invalid objects to be retained in the inventory, if present.
for _, id := range vCollector.InvalidIds {
taskContext.AddInvalidObject(id)
Expand Down
15 changes: 10 additions & 5 deletions pkg/apply/destroyer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
Expand Down Expand Up @@ -129,6 +130,10 @@ func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options Des
}
validator.Validate(deleteObjs)

// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

klog.V(4).Infoln("destroyer building task queue...")
dynamicClient, err := d.factory.DynamicClient()
if err != nil {
Expand All @@ -141,6 +146,10 @@ func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options Des
Inv: invInfo,
InvPolicy: options.InventoryPolicy,
},
filter.DependencyFilter{
TaskContext: taskContext,
Strategy: actuation.ActuationStrategyDelete,
},
}
taskBuilder := &solver.TaskQueueBuilder{
Pruner: d.pruner,
Expand All @@ -165,7 +174,7 @@ func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options Des
taskQueue := taskBuilder.
WithPruneObjects(deleteObjs).
WithInventory(invInfo).
Build(opts)
Build(taskContext, opts)

klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))
Expand All @@ -187,10 +196,6 @@ func (d *Destroyer) Run(ctx context.Context, invInfo inventory.Info, options Des
return
}

// Build a TaskContext for passing info between tasks
resourceCache := cache.NewResourceCacheMap()
taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)

// Register invalid objects to be retained in the inventory, if present.
for _, id := range vCollector.InvalidIds {
taskContext.AddInvalidObject(id)
Expand Down
152 changes: 152 additions & 0 deletions pkg/apply/filter/dependency-filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package filter

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/apis/actuation"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/object"
)

//go:generate stringer -type=Relationship -linecomment
type Relationship int

const (
RelationshipDependent Relationship = iota // Dependent
RelationshipDependency // Dependency
)

// DependencyFilter implements ValidationFilter interface to determine if an
// object can be applied or deleted based on the status of it's dependencies.
type DependencyFilter struct {
TaskContext *taskrunner.TaskContext
Strategy actuation.ActuationStrategy
}

const DependencyFilterName = "DependencyFilter"

// Name returns the name of the filter for logs and events.
func (dnrf DependencyFilter) Name() string {
return DependencyFilterName
}

// Filter returns true if the specified object should be skipped because at
// least one of its dependencies is Not Found or Not Reconciled.
func (dnrf DependencyFilter) Filter(obj *unstructured.Unstructured) (bool, string, error) {
id := object.UnstructuredToObjMetadata(obj)

switch dnrf.Strategy {
case actuation.ActuationStrategyApply:
// For apply, check dependencies (outgoing)
for _, depID := range dnrf.TaskContext.Graph().Dependencies(id) {
skip, reason, err := dnrf.filterByRelationStatus(depID, RelationshipDependency)
if err != nil {
return false, "", err
}
if skip {
return skip, reason, nil
}
}
case actuation.ActuationStrategyDelete:
// For delete, check dependents (incoming)
for _, depID := range dnrf.TaskContext.Graph().Dependents(id) {
skip, reason, err := dnrf.filterByRelationStatus(depID, RelationshipDependent)
if err != nil {
return false, "", err
}
if skip {
return skip, reason, nil
}
}
default:
panic(fmt.Sprintf("invalid filter strategy: %q", dnrf.Strategy))
}
return false, "", nil
}

func (dnrf DependencyFilter) filterByRelationStatus(id object.ObjMetadata, relationship Relationship) (bool, string, error) {
// Dependency on an invalid object is considered an invalid dependency, making both objects invalid.
// For applies: don't prematurely apply something that depends on something that hasn't been applied (because invalid).
// For deletes: don't prematurely delete something that depends on something that hasn't been deleted (because invalid).
// These can't be caught be subsequent checks, because invalid objects aren't in the inventory.
if dnrf.TaskContext.IsInvalidObject(id) {
// Skip!
return true, fmt.Sprintf("%s invalid: %q",
strings.ToLower(relationship.String()),
id), nil
}

status, found := dnrf.TaskContext.InventoryManager().ObjectStatus(id)
if !found {
// Status is registered during planning.
// So if status is not found, the object is external (NYI) or invalid.
return false, "", fmt.Errorf("unknown %s actuation strategy: %v",
strings.ToLower(relationship.String()), id)
}

// Dependencies must have the same actuation strategy.
// If there is a mismatch, skip both.
if status.Strategy != dnrf.Strategy {
return true, fmt.Sprintf("%s skipped because %s is scheduled for %s: %q",
strings.ToLower(dnrf.Strategy.String()),
strings.ToLower(relationship.String()),
strings.ToLower(status.Strategy.String()),
id), nil
}

switch status.Actuation {
case actuation.ActuationPending:
// If actuation is still pending, dependency sorting is probably broken.
return false, "", fmt.Errorf("premature %s: %s %s actuation %s: %q",
strings.ToLower(dnrf.Strategy.String()),
strings.ToLower(relationship.String()),
strings.ToLower(status.Strategy.String()),
strings.ToLower(status.Actuation.String()),
id)
case actuation.ActuationSkipped, actuation.ActuationFailed:
// Skip!
return true, fmt.Sprintf("%s %s actuation %s: %q",
strings.ToLower(relationship.String()),
strings.ToLower(dnrf.Strategy.String()),
strings.ToLower(status.Actuation.String()),
id), nil
case actuation.ActuationSucceeded:
// Don't skip!
default:
return false, "", fmt.Errorf("invalid %s apply status %q: %q",
strings.ToLower(relationship.String()),
strings.ToLower(status.Actuation.String()),
id)
}

switch status.Reconcile {
case actuation.ReconcilePending:
// If reconcile is still pending, dependency sorting is probably broken.
return false, "", fmt.Errorf("premature %s: %s %s reconcile %s: %q",
strings.ToLower(dnrf.Strategy.String()),
strings.ToLower(relationship.String()),
strings.ToLower(status.Strategy.String()),
strings.ToLower(status.Reconcile.String()),
id)
case actuation.ReconcileSkipped, actuation.ReconcileFailed, actuation.ReconcileTimeout:
// Skip!
return true, fmt.Sprintf("%s %s reconcile %s: %q",
strings.ToLower(relationship.String()),
strings.ToLower(dnrf.Strategy.String()),
strings.ToLower(status.Reconcile.String()),
id), nil
case actuation.ReconcileSucceeded:
// Don't skip!
default:
return false, "", fmt.Errorf("invalid dependency reconcile status %q: %q",
strings.ToLower(status.Reconcile.String()), id)
}

// Don't skip!
return false, "", nil
}
Loading

0 comments on commit 5c6134a

Please sign in to comment.