Skip to content

Commit

Permalink
Merge pull request #1074 from equinor/master
Browse files Browse the repository at this point in the history
Fix pipeline job deployment race condition (#1073)
  • Loading branch information
satr authored Mar 25, 2024
2 parents 10049c9 + 2884277 commit 1c145b9
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 36 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ mocks:
mockgen -source ./radix-operator/dnsalias/internal/syncerfactory.go -destination ./radix-operator/dnsalias/internal/syncerfactory_mock.go -package internal
mockgen -source ./radix-operator/common/handler.go -destination ./radix-operator/common/handler_mock.go -package common
mockgen -source ./pipeline-runner/internal/wait/job.go -destination ./pipeline-runner/internal/wait/job_mock.go -package wait
mockgen -source ./pipeline-runner/internal/watcher/radix_deployment_watcher.go -destination ./pipeline-runner/internal/watcher/radix_deployment_watcher_mock.go -package watcher

.PHONY: build-pipeline
build-pipeline:
Expand Down
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.30.5
appVersion: 1.50.5
version: 1.30.6
appVersion: 1.50.6
kubeVersion: ">=1.24.0"
description: Radix Operator
keywords:
Expand Down
1 change: 1 addition & 0 deletions charts/radix-operator/templates/radix-pipeline-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ rules:
- get
- list
- create
- delete
- apiGroups:
- ""
resources:
Expand Down
54 changes: 54 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package watcher

import (
"context"
"time"

radixclient "github.com/equinor/radix-operator/pkg/client/clientset/versioned"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

// RadixDeploymentWatcher Watcher to wait for namespace to be created
type RadixDeploymentWatcher interface {
WaitForActive(namespace, deploymentName string) error
}

// radixDeploymentWatcher Implementation of watcher
type radixDeploymentWatcher struct {
radixClient radixclient.Interface
waitTimeout time.Duration
}

// NewRadixDeploymentWatcher Constructor
func NewRadixDeploymentWatcher(radixClient radixclient.Interface, waitTimeout time.Duration) RadixDeploymentWatcher {
return &radixDeploymentWatcher{
radixClient,
waitTimeout,
}
}

// WaitForActive Waits for the radix deployment gets active
func (watcher radixDeploymentWatcher) WaitForActive(namespace, deploymentName string) error {
log.Info().Msgf("Waiting for Radix deployment %s to activate", deploymentName)
if err := watcher.waitFor(func(context.Context) (bool, error) {
rd, err := watcher.radixClient.RadixV1().RadixDeployments(namespace).Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
return rd != nil && !rd.Status.ActiveFrom.IsZero(), nil
}); err != nil {
return err
}

log.Info().Msgf("Radix deployment %s in namespace %s is active", deploymentName, namespace)
return nil

}

func (watcher radixDeploymentWatcher) waitFor(condition wait.ConditionWithContextFunc) error {
timoutContext, cancel := context.WithTimeout(context.Background(), watcher.waitTimeout)
defer cancel()
return wait.PollUntilContextCancel(timoutContext, time.Second, true, condition)
}
48 changes: 48 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package watcher

import (
"context"
"errors"
"testing"
"time"

radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
radix "github.com/equinor/radix-operator/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubernetes "k8s.io/client-go/kubernetes/fake"
)

func setupTest(t *testing.T) (*radix.Clientset, *kubernetes.Clientset) {
radixClient := radix.NewSimpleClientset()
kubeClient := kubernetes.NewSimpleClientset()
return radixClient, kubeClient
}

func TestDeploy_WaitActiveDeployment(t *testing.T) {
const (
namespace = "app-dev"
radixDeploymentName = "any-rd-name"
)
type scenario struct {
name string
hasRadixDevelopment bool
radixDeploymentStatus radixv1.RadixDeployStatus
watcherError error
}
scenarios := []scenario{
{name: "Active deployment, no fail", hasRadixDevelopment: true, radixDeploymentStatus: radixv1.RadixDeployStatus{ActiveFrom: metav1.Time{Time: time.Now()}}, watcherError: nil},
{name: "No active radix deployment, fail", hasRadixDevelopment: true, radixDeploymentStatus: radixv1.RadixDeployStatus{}, watcherError: errors.New("context deadline exceeded")},
{name: "No radix deployment, fail", hasRadixDevelopment: false, watcherError: errors.New("radixdeployments.radix.equinor.com \"any-rd-name\" not found")},
}
for _, ts := range scenarios {
t.Run(ts.name, func(tt *testing.T) {
radixClient, kubeClient := setupTest(tt)
require.NoError(t, createNamespace(kubeClient, namespace))

if ts.hasRadixDevelopment {
_, err := radixClient.RadixV1().RadixDeployments(namespace).Create(context.Background(), &radixv1.RadixDeployment{
ObjectMeta: metav1.ObjectMeta{Name: radixDeploymentName},
Status: ts.radixDeploymentStatus,
}, metav1.CreateOptions{})
require.NoError(tt, err)
}

watcher := NewRadixDeploymentWatcher(radixClient, time.Millisecond*10)
err := watcher.WaitForActive(namespace, radixDeploymentName)

if ts.watcherError == nil {
assert.NoError(tt, err)
} else {
assert.EqualError(tt, err, ts.watcherError.Error())
}
})
}
}

func createNamespace(kubeClient *kubernetes.Clientset, namespace string) error {
_, err := kubeClient.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
return err
}
4 changes: 3 additions & 1 deletion pipeline-runner/pipelines/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package pipelines

import (
"context"
"time"

"github.com/equinor/radix-operator/pipeline-runner/internal/watcher"
"github.com/equinor/radix-operator/pipeline-runner/model"
"github.com/equinor/radix-operator/pipeline-runner/steps"
jobs "github.com/equinor/radix-operator/pkg/apis/job"
Expand Down Expand Up @@ -109,7 +111,7 @@ func (cli *PipelineRunner) initStepImplementations(registration *v1.RadixRegistr
stepImplementations = append(stepImplementations, steps.NewApplyConfigStep())
stepImplementations = append(stepImplementations, steps.NewBuildStep(nil))
stepImplementations = append(stepImplementations, steps.NewRunPipelinesStep(nil))
stepImplementations = append(stepImplementations, steps.NewDeployStep(kube.NewNamespaceWatcherImpl(cli.kubeclient)))
stepImplementations = append(stepImplementations, steps.NewDeployStep(kube.NewNamespaceWatcherImpl(cli.kubeclient), watcher.NewRadixDeploymentWatcher(cli.radixclient, time.Minute*5)))
stepImplementations = append(stepImplementations, steps.NewPromoteStep())

for _, stepImplementation := range stepImplementations {
Expand Down
16 changes: 8 additions & 8 deletions pipeline-runner/steps/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *buildTestSuite) Test_BuildDeploy_JobSpecAndDeploymentConsistent() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_MultipleComponents() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_MultipleComponents_IgnoreDisabled() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -568,7 +568,7 @@ func (s *buildTestSuite) Test_BuildChangedComponents() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -1041,7 +1041,7 @@ func (s *buildTestSuite) Test_DetectComponentsToBuild() {
}
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

// Run pipeline steps
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_ImageTagNames() {

applyStep := steps.NewApplyConfigStep()
applyStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -1330,7 +1330,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_WithBuildSecrets() {
applyStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
s.Require().NoError(applyStep.Run(&pipeline))
s.Require().NoError(buildStep.Run(&pipeline))
Expand Down Expand Up @@ -1694,7 +1694,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_EnvConfigSrcAndImage() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down
45 changes: 28 additions & 17 deletions pipeline-runner/steps/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

pipelineRunnerInternal "github.com/equinor/radix-operator/pipeline-runner/internal/watcher"
"github.com/equinor/radix-operator/pipeline-runner/model"
"github.com/equinor/radix-operator/pipeline-runner/steps/internal"
"github.com/equinor/radix-operator/pkg/apis/defaults"
Expand All @@ -17,16 +18,18 @@ import (

// DeployStepImplementation Step to deploy RD into environment
type DeployStepImplementation struct {
stepType pipeline.StepType
namespaceWatcher kube.NamespaceWatcher
stepType pipeline.StepType
namespaceWatcher kube.NamespaceWatcher
radixDeploymentWatcher pipelineRunnerInternal.RadixDeploymentWatcher
model.DefaultStepImplementation
}

// NewDeployStep Constructor
func NewDeployStep(namespaceWatcher kube.NamespaceWatcher) model.Step {
func NewDeployStep(namespaceWatcher kube.NamespaceWatcher, radixDeploymentWatcher pipelineRunnerInternal.RadixDeploymentWatcher) model.Step {
return &DeployStepImplementation{
stepType: pipeline.DeployStep,
namespaceWatcher: namespaceWatcher,
stepType: pipeline.DeployStep,
namespaceWatcher: namespaceWatcher,
radixDeploymentWatcher: radixDeploymentWatcher,
}
}

Expand Down Expand Up @@ -69,7 +72,7 @@ func (cli *DeployStepImplementation) deploy(pipelineInfo *model.PipelineInfo) er
return nil
}

func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineInfo *model.PipelineInfo) error {
func (cli *DeployStepImplementation) deployToEnv(appName, envName string, pipelineInfo *model.PipelineInfo) error {
defaultEnvVars, err := getDefaultEnvVars(pipelineInfo)
if err != nil {
return fmt.Errorf("failed to retrieve default env vars for RadixDeployment in app %s. %v", appName, err)
Expand All @@ -89,7 +92,7 @@ func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineIn
return err
}

currentRd, err := internal.GetCurrentRadixDeployment(cli.GetKubeutil(), utils.GetEnvironmentNamespace(appName, env))
currentRd, err := internal.GetCurrentRadixDeployment(cli.GetKubeutil(), utils.GetEnvironmentNamespace(appName, envName))
if err != nil {
return err
}
Expand All @@ -99,27 +102,35 @@ func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineIn
pipelineInfo.PipelineArguments.JobName,
pipelineInfo.PipelineArguments.ImageTag,
pipelineInfo.PipelineArguments.Branch,
pipelineInfo.DeployEnvironmentComponentImages[env],
env,
pipelineInfo.DeployEnvironmentComponentImages[envName],
envName,
defaultEnvVars,
radixApplicationHash,
buildSecretHash,
pipelineInfo.PrepareBuildContext,
pipelineInfo.PipelineArguments.ComponentsToDeploy)

if err != nil {
return fmt.Errorf("failed to create radix deployments objects for app %s. %v", appName, err)
return fmt.Errorf("failed to create Radix deployment in environment %s. %w", envName, err)
}

err = cli.namespaceWatcher.WaitFor(utils.GetEnvironmentNamespace(cli.GetAppName(), env))
if err != nil {
return fmt.Errorf("failed to get environment namespace, %s, for app %s. %v", env, appName, err)
namespace := utils.GetEnvironmentNamespace(cli.GetAppName(), envName)
if err = cli.namespaceWatcher.WaitFor(namespace); err != nil {
return fmt.Errorf("failed to get environment namespace %s, for app %s. %w", namespace, appName, err)
}

log.Info().Msgf("Apply radix deployment %s on env %s", radixDeployment.GetName(), radixDeployment.GetNamespace())
_, err = cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Create(context.TODO(), radixDeployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to apply radix deployment for app %s to environment %s. %v", appName, env, err)
radixDeploymentName := radixDeployment.GetName()
log.Info().Msgf("Apply Radix deployment %s to environment %s", radixDeploymentName, envName)
if _, err = cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Create(context.Background(), radixDeployment, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to apply Radix deployment for app %s to environment %s. %w", appName, envName, err)
}

if err := cli.radixDeploymentWatcher.WaitForActive(namespace, radixDeploymentName); err != nil {
log.Error().Err(err).Msgf("Failed to activate Radix deployment %s in environment %s. Deleting deployment", radixDeploymentName, envName)
if err := cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Delete(context.Background(), radixDeploymentName, metav1.DeleteOptions{}); err != nil {
log.Error().Err(err).Msgf("Failed to delete Radix deployment")
}
return err
}
return nil
}
Expand Down
Loading

0 comments on commit 1c145b9

Please sign in to comment.