Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RetryContext to eliminate busy-waiting #2615

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 36 additions & 66 deletions manifest/provider/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/terraform-plugin-go/tftypes"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-provider-kubernetes/manifest/payload"
"github.com/zclconf/go-cty/cty"

Expand All @@ -23,8 +24,6 @@ import (
"k8s.io/kubectl/pkg/polymorphichelpers"
)

const waiterSleepTime = 1 * time.Second

func (s *RawProviderServer) waitForCompletion(ctx context.Context, waitForBlock tftypes.Value, rs dynamic.ResourceInterface, rname string, rtype tftypes.Type, th map[string]string) error {
if waitForBlock.IsNull() || !waitForBlock.IsKnown() {
return nil
Expand All @@ -34,12 +33,15 @@ func (s *RawProviderServer) waitForCompletion(ctx context.Context, waitForBlock
if err != nil {
return err
}
return waiter.Wait(ctx)

deadline, _ := ctx.Deadline()
return retry.RetryContext(ctx, deadline.Sub(time.Now()),
waiter.Wait(ctx))
}

// Waiter is a simple interface to implement a blocking wait operation
type Waiter interface {
Wait(context.Context) error
Wait(context.Context) retry.RetryFunc
}

type WaiterError struct {
Expand Down Expand Up @@ -148,24 +150,17 @@ type FieldWaiter struct {
}

// Wait blocks until all of the FieldMatchers configured evaluate to true
func (w *FieldWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting until ready...\n")
for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "field matchers"}
}
}

func (w *FieldWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
// NOTE The typed API resource is actually returned in the
// event object but I haven't yet figured out how to convert it
// to a cty.Value.
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}
resObj := res.Object
meta := resObj["metadata"].(map[string]interface{})
Expand All @@ -175,17 +170,17 @@ func (w *FieldWaiter) Wait(ctx context.Context) error {

obj, err := payload.ToTFValue(resObj, w.resourceType, w.typeHints, tftypes.NewAttributePath())
if err != nil {
return err
return retry.NonRetryableError(err)
}

done, err := func(obj tftypes.Value) (bool, error) {
result := func(obj tftypes.Value) *retry.RetryError {
for _, m := range w.fieldMatchers {
vi, rp, err := tftypes.WalkAttributePath(obj, m.path)
if err != nil {
return false, err
return retry.RetryableError(err)
}
if len(rp.Steps()) > 0 {
return false, fmt.Errorf("attribute not present at path '%s'", m.path.String())
return retry.RetryableError(fmt.Errorf("attribute not present at path '%s'", m.path.String()))
}

var s string
Expand All @@ -208,33 +203,29 @@ func (w *FieldWaiter) Wait(ctx context.Context) error {
s = fmt.Sprintf("%f", i)
}
default:
return true, fmt.Errorf("wait_for: cannot match on type %q", v.Type().String())
return retry.NonRetryableError(fmt.Errorf("wait_for: cannot match on type %q", v.Type().String()))
}

if !m.valueMatcher.Match([]byte(s)) {
return false, nil
return retry.RetryableError(WaiterError{Reason: "field matchers"})
}
}

return true, nil
return nil
}(obj)

if done {
w.logger.Info("[ApplyResourceChange][Wait] Done waiting.\n")
return err
}

// TODO: implement with exponential back-off.
time.Sleep(waiterSleepTime) // lintignore:R018
return result
}
}

// NoopWaiter is a placeholder for when there is nothing to wait on
type NoopWaiter struct{}

// Wait returns immediately
func (w *NoopWaiter) Wait(_ context.Context) error {
return nil
func (w *NoopWaiter) Wait(_ context.Context) retry.RetryFunc {
return func() *retry.RetryError {
return nil
}
}

// FieldPathToTftypesPath takes a string representation of
Expand Down Expand Up @@ -286,43 +277,33 @@ type RolloutWaiter struct {
}

// Wait uses StatusViewer to determine if the rollout is done
func (w *RolloutWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting until rollout complete...\n")
for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "rollout to complete"}
}
}

func (w *RolloutWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}

gk := res.GetObjectKind().GroupVersionKind().GroupKind()
statusViewer, err := polymorphichelpers.StatusViewerFor(gk)
if err != nil {
return fmt.Errorf("error getting resource status: %v", err)
return retry.NonRetryableError(fmt.Errorf("error getting resource status: %v", err))
}

_, done, err := statusViewer.Status(res, 0)
if err != nil {
return fmt.Errorf("error getting resource status: %v", err)
return retry.NonRetryableError(fmt.Errorf("error getting resource status: %v", err))
}

if done {
break
return nil
}

time.Sleep(waiterSleepTime) // lintignore:R018
return retry.RetryableError(WaiterError{Reason: "rollout to complete"})
}

w.logger.Info("[ApplyResourceChange][Wait] Rollout complete\n")
return nil
}

// ConditionsWaiter will wait for the specified conditions on
Expand All @@ -335,22 +316,14 @@ type ConditionsWaiter struct {
}

// Wait checks all the configured conditions have been met
func (w *ConditionsWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting for conditions...\n")

for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "conditions"}
}
}

func (w *ConditionsWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}

if status, ok := res.Object["status"].(map[string]interface{}); ok {
Expand All @@ -373,13 +346,10 @@ func (w *ConditionsWaiter) Wait(ctx context.Context) error {
conditionsMet = conditionsMet && conditionMet
}
if conditionsMet {
break
return nil
}
}
}
time.Sleep(waiterSleepTime) // lintignore:R018
return retry.RetryableError(WaiterError{Reason: "conditions"})
}

w.logger.Info("[ApplyResourceChange][Wait] All conditions met.\n")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resource "kubernetes_manifest" "test" {

manifest = {
apiVersion = "v1"
kind = "Pod"

metadata = {
name = var.name
namespace = var.namespace

annotations = {
"test.terraform.io" = "test"
}

labels = {
app = "nginx"
}
}

spec = {
containers = [
{
name = "nginx"
image = "nginx:1.19"

readinessProbe = {
initialDelaySeconds = 10

httpGet = {
path = "/"
port = 80
}
}
}
]
}
}

wait {
fields = {
"status.phase" = "Invalid",
}
}

timeouts {
create = "5s"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resource kubernetes_manifest wait_for_rollout {
manifest = {
apiVersion = "apps/v1"
kind = "Deployment"
metadata = {
name = var.name
namespace = var.namespace
}
spec = {
replicas = 2
selector = {
matchLabels = {
app = "tf-acc-test"
}
}
template = {
metadata = {
labels = {
app = "tf-acc-test"
}
}
spec = {
containers = [
{
image = "nginx:invalid-does-not-exist"
imagePullPolicy = "IfNotPresent"
name = "tf-acc-test"
readinessProbe = {
httpGet = {
port = 80
path = "/"
}
initialDelaySeconds = 10
}
},
]
}
}
}
}

wait {
rollout = true
}

timeouts {
create = "5s"
}
}
Loading
Loading