Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pipeline job deployment race condition #1073

Merged
merged 14 commits into from
Mar 22, 2024
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ mocks:
mockgen -source ./radix-operator/dnsalias/internal/syncerfactory.go -destination ./radix-operator/dnsalias/internal/syncerfactory_mock.go -package internal
mockgen -source ./radix-operator/common/handler.go -destination ./radix-operator/common/handler_mock.go -package common
mockgen -source ./pipeline-runner/internal/wait/job.go -destination ./pipeline-runner/internal/wait/job_mock.go -package wait
mockgen -source ./pipeline-runner/internal/watcher/radix_deployment_watcher.go -destination ./pipeline-runner/internal/watcher/radix_deployment_watcher_mock.go -package watcher

.PHONY: build-pipeline
build-pipeline:
Expand Down
4 changes: 2 additions & 2 deletions charts/radix-operator/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
name: radix-operator
version: 1.30.5
appVersion: 1.50.5
version: 1.30.6
appVersion: 1.50.6
kubeVersion: ">=1.24.0"
description: Radix Operator
keywords:
Expand Down
1 change: 1 addition & 0 deletions charts/radix-operator/templates/radix-pipeline-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ rules:
- get
- list
- create
- delete
- apiGroups:
- ""
resources:
Expand Down
54 changes: 54 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package watcher

import (
"context"
"time"

radixclient "github.com/equinor/radix-operator/pkg/client/clientset/versioned"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

// RadixDeploymentWatcher Watcher to wait for namespace to be created
type RadixDeploymentWatcher interface {
WaitForActive(namespace, deploymentName string) error
}

// radixDeploymentWatcher Implementation of watcher
type radixDeploymentWatcher struct {
radixClient radixclient.Interface
waitTimeout time.Duration
}

// NewRadixDeploymentWatcher Constructor
func NewRadixDeploymentWatcher(radixClient radixclient.Interface, waitTimeout time.Duration) RadixDeploymentWatcher {
return &radixDeploymentWatcher{
radixClient,
waitTimeout,
}
}

// WaitForActive Waits for the radix deployment gets active
func (watcher radixDeploymentWatcher) WaitForActive(namespace, deploymentName string) error {
log.Info().Msgf("Waiting for Radix deployment %s to activate", deploymentName)
if err := watcher.waitFor(func(context.Context) (bool, error) {
rd, err := watcher.radixClient.RadixV1().RadixDeployments(namespace).Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
return rd != nil && !rd.Status.ActiveFrom.IsZero(), nil
}); err != nil {
return err
}

log.Info().Msgf("Radix deployment %s in namespace %s is active", deploymentName, namespace)
return nil

}

func (watcher radixDeploymentWatcher) waitFor(condition wait.ConditionWithContextFunc) error {
timoutContext, cancel := context.WithTimeout(context.Background(), watcher.waitTimeout)
defer cancel()
return wait.PollUntilContextCancel(timoutContext, time.Second, true, condition)
}
48 changes: 48 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions pipeline-runner/internal/watcher/radix_deployment_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package watcher

import (
"context"
"errors"
"testing"
"time"

radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
radix "github.com/equinor/radix-operator/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubernetes "k8s.io/client-go/kubernetes/fake"
)

func setupTest(t *testing.T) (*radix.Clientset, *kubernetes.Clientset) {
radixClient := radix.NewSimpleClientset()
kubeClient := kubernetes.NewSimpleClientset()
return radixClient, kubeClient
}

func TestDeploy_WaitActiveDeployment(t *testing.T) {
const (
namespace = "app-dev"
radixDeploymentName = "any-rd-name"
)
type scenario struct {
name string
hasRadixDevelopment bool
radixDeploymentStatus radixv1.RadixDeployStatus
watcherError error
}
scenarios := []scenario{
{name: "Active deployment, no fail", hasRadixDevelopment: true, radixDeploymentStatus: radixv1.RadixDeployStatus{ActiveFrom: metav1.Time{Time: time.Now()}}, watcherError: nil},
{name: "No active radix deployment, fail", hasRadixDevelopment: true, radixDeploymentStatus: radixv1.RadixDeployStatus{}, watcherError: errors.New("context deadline exceeded")},
{name: "No radix deployment, fail", hasRadixDevelopment: false, watcherError: errors.New("radixdeployments.radix.equinor.com \"any-rd-name\" not found")},
}
for _, ts := range scenarios {
t.Run(ts.name, func(tt *testing.T) {
radixClient, kubeClient := setupTest(tt)
require.NoError(t, createNamespace(kubeClient, namespace))

if ts.hasRadixDevelopment {
_, err := radixClient.RadixV1().RadixDeployments(namespace).Create(context.Background(), &radixv1.RadixDeployment{
ObjectMeta: metav1.ObjectMeta{Name: radixDeploymentName},
Status: ts.radixDeploymentStatus,
}, metav1.CreateOptions{})
require.NoError(tt, err)
}

watcher := NewRadixDeploymentWatcher(radixClient, time.Millisecond*10)
err := watcher.WaitForActive(namespace, radixDeploymentName)

if ts.watcherError == nil {
assert.NoError(tt, err)
} else {
assert.EqualError(tt, err, ts.watcherError.Error())
}
})
}
}

func createNamespace(kubeClient *kubernetes.Clientset, namespace string) error {
_, err := kubeClient.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
return err
}
4 changes: 3 additions & 1 deletion pipeline-runner/pipelines/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package pipelines

import (
"context"
"time"

"github.com/equinor/radix-operator/pipeline-runner/internal/watcher"
"github.com/equinor/radix-operator/pipeline-runner/model"
"github.com/equinor/radix-operator/pipeline-runner/steps"
jobs "github.com/equinor/radix-operator/pkg/apis/job"
Expand Down Expand Up @@ -109,7 +111,7 @@ func (cli *PipelineRunner) initStepImplementations(registration *v1.RadixRegistr
stepImplementations = append(stepImplementations, steps.NewApplyConfigStep())
stepImplementations = append(stepImplementations, steps.NewBuildStep(nil))
stepImplementations = append(stepImplementations, steps.NewRunPipelinesStep(nil))
stepImplementations = append(stepImplementations, steps.NewDeployStep(kube.NewNamespaceWatcherImpl(cli.kubeclient)))
stepImplementations = append(stepImplementations, steps.NewDeployStep(kube.NewNamespaceWatcherImpl(cli.kubeclient), watcher.NewRadixDeploymentWatcher(cli.radixclient, time.Minute*5)))
stepImplementations = append(stepImplementations, steps.NewPromoteStep())

for _, stepImplementation := range stepImplementations {
Expand Down
16 changes: 8 additions & 8 deletions pipeline-runner/steps/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *buildTestSuite) Test_BuildDeploy_JobSpecAndDeploymentConsistent() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_MultipleComponents() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_MultipleComponents_IgnoreDisabled() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -568,7 +568,7 @@ func (s *buildTestSuite) Test_BuildChangedComponents() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -1041,7 +1041,7 @@ func (s *buildTestSuite) Test_DetectComponentsToBuild() {
}
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

// Run pipeline steps
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_ImageTagNames() {

applyStep := steps.NewApplyConfigStep()
applyStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down Expand Up @@ -1330,7 +1330,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_WithBuildSecrets() {
applyStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
s.Require().NoError(applyStep.Run(&pipeline))
s.Require().NoError(buildStep.Run(&pipeline))
Expand Down Expand Up @@ -1694,7 +1694,7 @@ func (s *buildTestSuite) Test_BuildJobSpec_EnvConfigSrcAndImage() {
jobWaiter.EXPECT().Wait(gomock.Any()).Return(nil).Times(1)
buildStep := steps.NewBuildStep(jobWaiter)
buildStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{})
deployStep := steps.NewDeployStep(FakeNamespaceWatcher{}, FakeRadixDeploymentWatcher{})
deployStep.Init(s.kubeClient, s.radixClient, s.kubeUtil, s.promClient, rr)

s.Require().NoError(applyStep.Run(&pipeline))
Expand Down
45 changes: 28 additions & 17 deletions pipeline-runner/steps/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

pipelineRunnerInternal "github.com/equinor/radix-operator/pipeline-runner/internal/watcher"
"github.com/equinor/radix-operator/pipeline-runner/model"
"github.com/equinor/radix-operator/pipeline-runner/steps/internal"
"github.com/equinor/radix-operator/pkg/apis/defaults"
Expand All @@ -17,16 +18,18 @@ import (

// DeployStepImplementation Step to deploy RD into environment
type DeployStepImplementation struct {
stepType pipeline.StepType
namespaceWatcher kube.NamespaceWatcher
stepType pipeline.StepType
namespaceWatcher kube.NamespaceWatcher
radixDeploymentWatcher pipelineRunnerInternal.RadixDeploymentWatcher
model.DefaultStepImplementation
}

// NewDeployStep Constructor
func NewDeployStep(namespaceWatcher kube.NamespaceWatcher) model.Step {
func NewDeployStep(namespaceWatcher kube.NamespaceWatcher, radixDeploymentWatcher pipelineRunnerInternal.RadixDeploymentWatcher) model.Step {
return &DeployStepImplementation{
stepType: pipeline.DeployStep,
namespaceWatcher: namespaceWatcher,
stepType: pipeline.DeployStep,
namespaceWatcher: namespaceWatcher,
radixDeploymentWatcher: radixDeploymentWatcher,
}
}

Expand Down Expand Up @@ -69,7 +72,7 @@ func (cli *DeployStepImplementation) deploy(pipelineInfo *model.PipelineInfo) er
return nil
}

func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineInfo *model.PipelineInfo) error {
func (cli *DeployStepImplementation) deployToEnv(appName, envName string, pipelineInfo *model.PipelineInfo) error {
defaultEnvVars, err := getDefaultEnvVars(pipelineInfo)
if err != nil {
return fmt.Errorf("failed to retrieve default env vars for RadixDeployment in app %s. %v", appName, err)
Expand All @@ -89,7 +92,7 @@ func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineIn
return err
}

currentRd, err := internal.GetCurrentRadixDeployment(cli.GetKubeutil(), utils.GetEnvironmentNamespace(appName, env))
currentRd, err := internal.GetCurrentRadixDeployment(cli.GetKubeutil(), utils.GetEnvironmentNamespace(appName, envName))
if err != nil {
return err
}
Expand All @@ -99,27 +102,35 @@ func (cli *DeployStepImplementation) deployToEnv(appName, env string, pipelineIn
pipelineInfo.PipelineArguments.JobName,
pipelineInfo.PipelineArguments.ImageTag,
pipelineInfo.PipelineArguments.Branch,
pipelineInfo.DeployEnvironmentComponentImages[env],
env,
pipelineInfo.DeployEnvironmentComponentImages[envName],
envName,
defaultEnvVars,
radixApplicationHash,
buildSecretHash,
pipelineInfo.PrepareBuildContext,
pipelineInfo.PipelineArguments.ComponentsToDeploy)

if err != nil {
return fmt.Errorf("failed to create radix deployments objects for app %s. %v", appName, err)
return fmt.Errorf("failed to create Radix deployment in environment %s. %w", envName, err)
}

err = cli.namespaceWatcher.WaitFor(utils.GetEnvironmentNamespace(cli.GetAppName(), env))
if err != nil {
return fmt.Errorf("failed to get environment namespace, %s, for app %s. %v", env, appName, err)
namespace := utils.GetEnvironmentNamespace(cli.GetAppName(), envName)
if err = cli.namespaceWatcher.WaitFor(namespace); err != nil {
return fmt.Errorf("failed to get environment namespace %s, for app %s. %w", namespace, appName, err)
}

log.Info().Msgf("Apply radix deployment %s on env %s", radixDeployment.GetName(), radixDeployment.GetNamespace())
_, err = cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Create(context.TODO(), radixDeployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to apply radix deployment for app %s to environment %s. %v", appName, env, err)
radixDeploymentName := radixDeployment.GetName()
log.Info().Msgf("Apply Radix deployment %s to environment %s", radixDeploymentName, envName)
if _, err = cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Create(context.Background(), radixDeployment, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to apply Radix deployment for app %s to environment %s. %w", appName, envName, err)
}

if err := cli.radixDeploymentWatcher.WaitForActive(namespace, radixDeploymentName); err != nil {
log.Error().Err(err).Msgf("Failed to activate Radix deployment %s in environment %s. Deleting deployment", radixDeploymentName, envName)
if err := cli.GetRadixclient().RadixV1().RadixDeployments(radixDeployment.GetNamespace()).Delete(context.Background(), radixDeploymentName, metav1.DeleteOptions{}); err != nil {
log.Error().Err(err).Msgf("Failed to delete Radix deployment")
}
return err
}
return nil
}
Expand Down
Loading
Loading