diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 3a002ff..27da8ad 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -128,13 +128,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque } data, err := r.extractData(ctx, res) if err != nil { - var reason = "ReadingMigrationData" - if e, ok := err.(interface{ Reason() string }); ok { - reason = e.Reason() - } - res.SetNotReady(reason, err.Error()) - r.recordErrEvent(res, err) - return result(err) + return r.resultErr(res, err, "ReadingMigrationData") } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -155,16 +149,11 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque // spin up a dev-db and get the connection string. data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { - res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return r.resultErr(res, err, "GettingDevDB") } } // Reconcile given resource - err = r.reconcile(ctx, data, res) - if err != nil { - r.recordErrEvent(res, err) - } - return result(err) + return r.reconcile(ctx, data, res) } func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) { @@ -256,7 +245,7 @@ const ( ) // Reconcile the given AtlasMigration resource. -func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migrationData, res *dbv1alpha1.AtlasMigration) error { +func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migrationData, res *dbv1alpha1.AtlasMigration) (ctrl.Result, error) { log := ctrl.Log.WithName("atlas_migration.reconcile") // Create a working directory for the Atlas CLI // The working directory contains the atlas.hcl config @@ -266,13 +255,12 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio atlasexec.WithMigrations(data.Dir), ) if err != nil { - res.SetNotReady("ReadingMigrationData", err.Error()) - return err + return r.resultErr(res, err, "ReadingMigrationData") } defer wd.Close() c, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - return err + return r.resultErr(res, err, "CreatingAtlasClient") } var whoami *atlasexec.WhoAmI switch whoami, err = c.WhoAmI(ctx); { @@ -280,14 +268,10 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio log.Info("the resource is not connected to Atlas Cloud") if data.Config != nil { err = errors.New("login is required to use custom atlas.hcl config") - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return err + return r.resultErr(res, err, "WhoAmI") } case err != nil: - res.SetNotReady("WhoAmI", err.Error()) - r.recordErrEvent(res, err) - return err + return r.resultErr(res, err, "WhoAmI") default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } @@ -295,20 +279,19 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio // Check if there are any pending migration files status, err := c.MigrateStatus(ctx, &atlasexec.MigrateStatusParams{Env: data.EnvName, Vars: data.Vars}) if err != nil { - res.SetNotReady("Migrating", err.Error()) if isChecksumErr(err) { - return err + return r.resultErr(res, err, "Migrating") } - return transient(err) + return r.resultCLIErr(res, transient(err), "Migrating") } switch { case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied): if !data.MigrateDown { - res.SetNotReady("ProtectedFlowError", "Migrate down is not allowed") - return &ProtectedFlowError{ + err = &ProtectedFlowError{ reason: "ProtectedFlowError", msg: "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", } + return r.resultErr(res, err, "ProtectedFlowError") } // The downgrade is allowed, apply the last migration version last := status.Available[len(status.Available)-1] @@ -334,33 +317,29 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio // then use it to downgrade. current := fmt.Sprintf("migrations-%s", status.Current) if err = wd.CopyFS(current, data.DirLatest); err != nil { - return err + return r.resultErr(res, err, "CopyingDirState") } params.DirURL = fmt.Sprintf("file://%s", current) default: - return fmt.Errorf("unable to downgrade, no dir-state found") + return r.resultErr(res, errors.New("unable to downgrade, no dir-state found"), "Migrating") } run, err := c.MigrateDown(ctx, params) if err != nil { - res.SetNotReady("Migrating", err.Error()) - if !isSQLErr(err) { - err = transient(err) - } - return err + return r.resultCLIErr(res, err, "Migrating") } switch run.Status { case StatePending: res.SetNotReady("ApprovalPending", "Deployment is waiting for approval") res.Status.ApprovalURL = run.URL - return transient(&ProtectedFlowError{ + err = transient(&ProtectedFlowError{ reason: "ApprovalPending", msg: fmt.Sprintf("plan approval pending, review here: %s", run.URL), }) + return r.resultErr(res, err, "ApprovalPending") case StateAborted: - res.SetNotReady("PlanRejected", "Deployment is aborted") res.Status.ApprovalURL = run.URL // Migration is aborted, no need to reapply - return fmt.Errorf("plan rejected, review here: %s", run.URL) + return r.resultErr(res, fmt.Errorf("plan rejected, review here: %s", run.URL), "PlanRejected") case StateApplied, StateApproved: res.SetReady(dbv1alpha1.AtlasMigrationStatus{ ObservedHash: data.ObservedHash, @@ -397,14 +376,10 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio Vars: data.Vars, }) if err != nil { - res.SetNotReady("Migrating", err.Error()) - if !isSQLErr(err) { - err = transient(err) - } - return err + return r.resultCLIErr(res, err, "Migrating") } if len(reports) != 1 { - return fmt.Errorf("unexpected number of reports: %d", len(reports)) + return r.resultErr(res, fmt.Errorf("unexpected number of reports: %d", len(reports)), "Migrating") } res.SetReady(dbv1alpha1.AtlasMigrationStatus{ ObservedHash: data.ObservedHash, @@ -417,11 +392,10 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio // Compress the migration directory then store it in the secret // for later use when atlas runs the migration down. if err = r.storeDirState(ctx, res, data.Dir); err != nil { - res.SetNotReady("StoringDirState", err.Error()) - return err + return r.resultErr(res, err, "StoringDirState") } } - return nil + return ctrl.Result{}, nil } type ProtectedFlowError struct { @@ -569,6 +543,34 @@ func (r *AtlasMigrationReconciler) recordErrEvent(res *dbv1alpha1.AtlasMigration r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error())) } +func (r *AtlasMigrationReconciler) resultErr( + res *dbv1alpha1.AtlasMigration, err error, reason string, +) (ctrl.Result, error) { + if e, ok := err.(interface{ Reason() string }); ok { + reason = e.Reason() + } + if isConnectionErr(err) { + err = transient(err) + } + res.SetNotReady(reason, err.Error()) + r.recordErrEvent(res, err) + return result(err) +} + +func (r *AtlasMigrationReconciler) resultCLIErr( + res *dbv1alpha1.AtlasMigration, err error, reason string, +) (ctrl.Result, error) { + if e, ok := err.(interface{ Reason() string }); ok { + reason = e.Reason() + } + if !isSQLErr(err) { + err = transient(err) + } + res.SetNotReady(reason, err.Error()) + r.recordErrEvent(res, err) + return result(err) +} + // Calculate the hash of the given data func hashMigrationData(d *migrationData) (string, error) { h := sha256.New() diff --git a/internal/controller/atlasmigration_controller_test.go b/internal/controller/atlasmigration_controller_test.go index eee0b8f..8494be3 100644 --- a/internal/controller/atlasmigration_controller_test.go +++ b/internal/controller/atlasmigration_controller_test.go @@ -333,7 +333,7 @@ func TestMigration_MigrateDown_Remote_Protected(t *testing.T) { }) } // No changes because the migration down is not allowed - assert(ctrl.Result{}, false, "ProtectedFlowError", "Migrate down is not allowed", "", "", "") + assert(ctrl.Result{}, false, "ProtectedFlowError", "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", "", "", "") obj = &dbv1alpha1.AtlasMigration{ ObjectMeta: meta, @@ -362,8 +362,8 @@ func TestMigration_MigrateDown_Remote_Protected(t *testing.T) { URL: "THIS_IS_DEPLOYMENT_URL", } // Reconcile again - assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "") - assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "Deployment is waiting for approval", "", "THIS_IS_DEPLOYMENT_URL", "") + assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", "", "THIS_IS_DEPLOYMENT_URL", "") + assert(ctrl.Result{RequeueAfter: 5 * time.Second}, false, "ApprovalPending", "plan approval pending, review here: THIS_IS_DEPLOYMENT_URL", "", "THIS_IS_DEPLOYMENT_URL", "") mockExec.down.res = &atlasexec.MigrateDown{ Current: "2", @@ -462,7 +462,7 @@ func TestMigration_MigrateDown_Local(t *testing.T) { URL: "", } // No changes because the migration down is not allowed - assert(ctrl.Result{}, false, "ProtectedFlowError", "Migrate down is not allowed", "", "", "") + assert(ctrl.Result{}, false, "ProtectedFlowError", "migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade", "", "", "") h.patch(t, &dbv1alpha1.AtlasMigration{ ObjectMeta: meta, @@ -687,7 +687,7 @@ func TestReconcile_reconcile(t *testing.T) { } md, err := tt.r.extractData(context.Background(), res) require.NoError(t, err) - err = tt.r.reconcile(context.Background(), md, res) + _, err = tt.r.reconcile(context.Background(), md, res) require.NoError(t, err) require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion) } @@ -746,7 +746,7 @@ func TestReconcile_reconcile_upToDate(t *testing.T) { }, }) require.NoError(t, err) - err = tt.r.reconcile(context.Background(), md, res) + _, err = tt.r.reconcile(context.Background(), md, res) require.NoError(t, err) require.EqualValues(t, "20230412003626", res.Status.LastAppliedVersion) } @@ -770,7 +770,7 @@ func TestReconcile_reconcile_baseline(t *testing.T) { } md, err := tt.r.extractData(context.Background(), res) require.NoError(t, err) - err = tt.r.reconcile(context.Background(), md, res) + _, err = tt.r.reconcile(context.Background(), md, res) require.NoError(t, err) require.EqualValues(t, "20230412003628", res.Status.LastAppliedVersion) diff --git a/test/e2e/testscript/migration-mysql.txtar b/test/e2e/testscript/migration-mysql.txtar index 70c42d1..ddc275f 100644 --- a/test/e2e/testscript/migration-mysql.txtar +++ b/test/e2e/testscript/migration-mysql.txtar @@ -35,7 +35,7 @@ kubectl create configmap migration-dir --from-file=migrations-v1 --dry-run=clien stdin stdout kubectl apply -f - # Expect migration is failured -kubectl wait --timeout=500s --for=jsonpath='{.status.conditions[*].message}'='"Migrate down is not allowed"' AtlasMigration/mysql +kubectl wait --timeout=500s --for=jsonpath='{.status.conditions[*].message}'='"migrate down is not allowed, set `migrateDown.allow` to true to allow downgrade"' AtlasMigration/mysql # Patch the migration to allow down migration kubectl patch AtlasMigration/mysql --type merge --patch-file ./migration-patch-down.yaml kubectl wait --timeout=500s --for=condition=ready AtlasMigration/mysql