From b4d554b6cacca915ac06f210434654e5f31beab1 Mon Sep 17 00:00:00 2001 From: "Dat. Ba Dao" Date: Wed, 22 Jan 2025 13:00:00 +0700 Subject: [PATCH] Internal/controller: refactor error handling in declarative flow (#261) --- internal/controller/atlasschema_controller.go | 130 +++++------------- 1 file changed, 38 insertions(+), 92 deletions(-) diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index 32b4c74..0d5e7c9 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -109,10 +109,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } defer func() { - // At the end of reconcile, update the status of the resource base on the error - if err != nil { - r.recordErrEvent(res, err) - } if err := r.Status().Update(ctx, res); err != nil { log.Error(err, "failed to update resource status") } @@ -130,13 +126,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) } data, err := r.extractData(ctx, res) if err != nil { - res.SetNotReady("ReadSchema", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "ReadSchema") } if data.hasTargets() { - res.SetNotReady("ReadSchema", "Multiple targets are not supported") - return ctrl.Result{}, nil + return r.resultErr(res, errors.New("multiple targets are not supported"), "ReadSchema") } opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { @@ -151,17 +144,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // spin up a dev-db and get the connection string. data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { - res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return r.resultErr(res, err, "GettingDevDB") } } // Create a working directory for the Atlas CLI // The working directory contains the atlas.hcl config. wd, err := atlasexec.NewWorkingDir(opts...) if err != nil { - res.SetNotReady("CreatingWorkingDir", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "CreatingWorkingDir") } defer wd.Close() // This function will be used to edit and re-render the atlas.hcl file in the working directory. @@ -176,9 +166,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) } cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - res.SetNotReady("CreatingAtlasClient", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "CreatingAtlasClient") } // Calculate the hash of the current schema. hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ @@ -188,12 +176,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) Vars: data.Vars, }) if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - if isConnectionErr(err) { - err = transient(err) - } - return result(err) + return r.resultErr(res, err, "CalculatingHash") } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -211,24 +194,17 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) case errors.Is(err, atlasexec.ErrRequireLogin): log.Info("the resource is not connected to Atlas Cloud") if data.Config != nil { - err = errors.New("login is required to use custom atlas.hcl config") - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "WhoAmI") } case err != nil: res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return result(err) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } var reports []*atlasexec.SchemaApply shouldLint, err := data.shouldLint() if err != nil { - res.SetNotReady("LintPolicyError", err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "LintPolicyError") } switch desiredURL := data.targetURL(); { // The resource is connected to Atlas Cloud. @@ -265,14 +241,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) Vars: data.Vars, }) if err != nil { - reason, msg := "SchemaPush", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "SchemaPush") } log.Info("schema is a file, pushing the schema to Atlas Cloud") state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ @@ -283,14 +252,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) Vars: data.Vars, }) if err != nil { - reason, msg := "SchemaPush", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "SchemaPush") } desiredURL = state.URL } @@ -314,19 +276,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil case err != nil: reason, msg := "SchemaPlan", err.Error() - res.SetNotReady(reason, msg) r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, reason) default: log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL) res.Status.PlanURL = plan.File.URL res.Status.PlanLink = plan.File.Link reason, msg := "ApprovalPending", "Schema plan is waiting for approval" - res.SetNotReady(reason, msg) r.recorder.Event(res, corev1.EventTypeNormal, reason, msg) return ctrl.Result{RequeueAfter: time.Second * 5}, nil } @@ -340,14 +296,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) Vars: data.Vars, }); { case err != nil: - reason, msg := "ListingPlans", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "ListingPlans") // There are multiple pending plans. This is an unexpected state. case len(plans) > 1: planURLs := make([]string, 0, len(plans)) @@ -355,12 +304,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) planURLs = append(planURLs, p.URL) } log.Info("multiple schema plans found", "plans", planURLs) - reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", ")) - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - err = errors.New(msg) - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, fmt.Errorf("multiple schema plans found: %s", strings.Join(planURLs, ", ")), "ListingPlans") // There are no pending plans, but Atlas has been asked to review the changes ALWAYS. case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways: // Create a plan for the pending changes. @@ -406,14 +350,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Don't requeue destructive errors. return ctrl.Result{}, nil case err != nil: - reason, msg := "VerifyingFirstRun", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "VerifyingFirstRun") } // Revert the destructive linting policy back to the original value. if err = editAtlasHCL(func(m *managedData) { @@ -431,14 +368,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Run the linting policy. case shouldLint: if err = r.lint(ctx, wd, data, nil); err != nil { - reason, msg := "LintPolicyError", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "LintPolicyError") } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -458,13 +388,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) } if err != nil { - res.SetNotReady("ApplyingSchema", err.Error()) - r.recorder.Event(res, corev1.EventTypeWarning, "ApplyingSchema", err.Error()) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) + return r.resultCLIErr(res, err, "ApplyingSchema") } s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), @@ -618,6 +542,28 @@ func (r *AtlasSchemaReconciler) recordErrEvent(res *dbv1alpha1.AtlasSchema, err r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error())) } +func (r *AtlasSchemaReconciler) resultErr( + res *dbv1alpha1.AtlasSchema, err error, reason string, +) (ctrl.Result, error) { + if isConnectionErr(err) { + err = transient(err) + } + res.SetNotReady(reason, err.Error()) + r.recordErrEvent(res, err) + return result(err) +} + +func (r *AtlasSchemaReconciler) resultCLIErr( + res *dbv1alpha1.AtlasSchema, err error, reason string, +) (ctrl.Result, error) { + if !isSQLErr(err) { + err = transient(err) + } + res.SetNotReady(reason, err.Error()) + r.recordErrEvent(res, err) + return result(err) +} + // ShouldLint returns true if the linting policy is set to error. func (d *managedData) shouldLint() (bool, error) { p := d.Policy