Skip to content

Commit

Permalink
internal/controller: remove duplicate recorded event in declarative flow
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao committed Jan 22, 2025
1 parent 02bb557 commit 1af1eb5
Showing 1 changed file with 15 additions and 22 deletions.
37 changes: 15 additions & 22 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
res.SetNotReady("Reconciling", "Reconciling")
return ctrl.Result{Requeue: true}, nil
}
data, err := r.extractData(ctx, res)
var data *managedData
data, err = r.extractData(ctx, res)
if err != nil {
res.SetNotReady("ReadSchema", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
if data.hasTargets() {
Expand All @@ -157,10 +157,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
var wd *atlasexec.WorkingDir
wd, err = atlasexec.NewWorkingDir(opts...)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
Expand All @@ -177,19 +177,18 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// Calculate the hash of the current schema.
hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
var hash string
hash, err = cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: data.targetURL(),
Format: `{{ .Hash }}`,
Vars: data.Vars,
})
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
if isConnectionErr(err) {
err = transient(err)
}
Expand All @@ -213,21 +212,19 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if data.Config != nil {
err = errors.New("login is required to use custom atlas.hcl config")
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
case err != nil:
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
var reports []*atlasexec.SchemaApply
shouldLint, err := data.shouldLint()
var shouldLint bool
shouldLint, err = data.shouldLint()
if err != nil {
res.SetNotReady("LintPolicyError", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
switch desiredURL := data.targetURL(); {
Expand Down Expand Up @@ -258,6 +255,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// And the schema is available for the Atlas CLI (on local machine)
// to modify or approve the changes.
if data.Desired != nil && data.Desired.Scheme == dbv1alpha1.SchemaTypeFile {
var tag string
tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: desiredURL,
Expand All @@ -271,11 +269,11 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
log.Info("schema is a file, pushing the schema to Atlas Cloud")
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
var state *atlasexec.SchemaPush
state, err = cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Name: path.Join(repo.Host, repo.Path),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)),
Expand All @@ -289,14 +287,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
desiredURL = state.URL
}
log.Info("creating a new schema plan", "desiredURL", desiredURL)
// Create a new plan for the pending changes.
plan, err := cli.SchemaPlan(ctx, &atlasexec.SchemaPlanParams{
var plan *atlasexec.SchemaPlan
plan, err = cli.SchemaPlan(ctx, &atlasexec.SchemaPlanParams{
Env: data.EnvName,
Repo: repo.String(),
From: []string{"env://url"},
Expand All @@ -319,7 +317,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
default:
log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL)
Expand All @@ -332,7 +329,8 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
// List the schema plans to check if there are any plans.
switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{
var plans []atlasexec.SchemaPlanFile
switch plans, err = cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{
Env: data.EnvName,
Repo: repo.String(),
From: []string{"env://url"},
Expand All @@ -346,7 +344,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
// There are multiple pending plans. This is an unexpected state.
case len(plans) > 1:
Expand All @@ -359,7 +356,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
err = errors.New(msg)
r.recordErrEvent(res, err)
return result(err)
// There are no pending plans, but Atlas has been asked to review the changes ALWAYS.
case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways:
Expand Down Expand Up @@ -412,7 +408,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
// Revert the destructive linting policy back to the original value.
Expand All @@ -437,7 +432,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Expand All @@ -463,7 +457,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
s := dbv1alpha1.AtlasSchemaStatus{
Expand Down

0 comments on commit 1af1eb5

Please sign in to comment.