Skip to content

Commit

Permalink
chore: ignore mvtx replicas if it exists (#358)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Oct 24, 2024
1 parent cc36374 commit fdc7a6b
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ spec:
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:stable
image: quay.io/numaio/numaflow-java/simple-sink:stable
41 changes: 38 additions & 3 deletions internal/controller/monovertexrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -210,7 +211,10 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR
} else {
// merge and update
// we directly apply changes as there is no need for draining MonoVertex
newMonoVertexDef = mergeMonoVertex(existingMonoVertexDef, newMonoVertexDef)
newMonoVertexDef, err = mergeMonoVertex(existingMonoVertexDef, newMonoVertexDef)
if err != nil {
return ctrl.Result{}, err
}
err := r.updateMonoVertex(ctx, monoVertexRollout, newMonoVertexDef)
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -248,10 +252,41 @@ func (r *MonoVertexRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return nil
}

func mergeMonoVertex(existingMonoVertex *kubernetes.GenericObject, newMonoVertex *kubernetes.GenericObject) *kubernetes.GenericObject {
func mergeMonoVertex(existingMonoVertex *kubernetes.GenericObject, newMonoVertex *kubernetes.GenericObject) (*kubernetes.GenericObject, error) {
resultMonoVertex := existingMonoVertex.DeepCopy()
resultMonoVertex.Spec = *newMonoVertex.Spec.DeepCopy()
return resultMonoVertex
// Use the same replicas as the existing MonoVertex
resultMonoVertex, err := withExistingMvtxReplicas(existingMonoVertex, resultMonoVertex)
return resultMonoVertex, err
}

// withExistingMvtxReplicas sets the replicas of the new MonoVertex to the existing MonoVertex's replicas if it exists.
func withExistingMvtxReplicas(existingMonoVertex, newMonoVertex *kubernetes.GenericObject) (*kubernetes.GenericObject, error) {
unstrucExisting, err := kubernetes.ObjectToUnstructured(existingMonoVertex)
if err != nil {
return newMonoVertex, err
}
// Have to use float64 as it's the type of the replicas field in the unstructured object
existingReplicas, existing, err := unstructured.NestedFloat64(unstrucExisting.Object, "spec", "replicas")
if err != nil {
return newMonoVertex, fmt.Errorf("failed to get replicas from existing MonoVertex: %w", err)
}
if existing {
unstrucNew, err := kubernetes.ObjectToUnstructured(newMonoVertex)
if err != nil {
return newMonoVertex, err
}
err = unstructured.SetNestedField(unstrucNew.Object, existingReplicas, "spec", "replicas")
if err != nil {
return newMonoVertex, fmt.Errorf("failed to set replicas in new MonoVertex: %w", err)
}

newMonoVertex, err = kubernetes.UnstructuredToObject(unstrucNew)
if err != nil {
return newMonoVertex, err
}
}
return newMonoVertex, nil
}

func (r *MonoVertexRolloutReconciler) processMonoVertexStatus(ctx context.Context, monoVertex *kubernetes.GenericObject, rollout *apiv1.MonoVertexRollout) {
Expand Down
110 changes: 110 additions & 0 deletions internal/controller/monovertexrollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package controller
import (
"context"
"encoding/json"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/util/kubernetes"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

Expand Down Expand Up @@ -227,3 +231,109 @@ var _ = Describe("MonoVertexRollout Controller", Ordered, func() {
})

})

func fakeMonoVertexSpec(t *testing.T) numaflowv1.MonoVertexSpec {
t.Helper()
return numaflowv1.MonoVertexSpec{
Replicas: ptr.To(int32(1)),
Source: &numaflowv1.Source{
UDSource: &numaflowv1.UDSource{
Container: &numaflowv1.Container{
Image: "quay.io/numaio/numaflow-java/source-simple-source:stable",
},
},
UDTransformer: &numaflowv1.UDTransformer{
Container: &numaflowv1.Container{
Image: "quay.io/numaio/numaflow-rs/source-transformer-now:stable",
},
},
},
Sink: &numaflowv1.Sink{
AbstractSink: numaflowv1.AbstractSink{
UDSink: &numaflowv1.UDSink{
Container: &numaflowv1.Container{
Image: "quay.io/numaio/numaflow-java/simple-sink:stable",
},
},
},
},
}
}

func fakeGenericMonoVertex(t *testing.T, s numaflowv1.MonoVertexSpec) *kubernetes.GenericObject {
t.Helper()
monoVertexSpecRaw, _ := json.Marshal(s)
return &kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "MonoVertex",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test--mvtx",
Namespace: "test-ns",
},
Spec: runtime.RawExtension{
Raw: monoVertexSpecRaw,
},
}
}

func Test_withExistingMvtxReplicas(t *testing.T) {
tests := []struct {
name string
existingReplicas *int32
newReplicas *int32
expected *int32
}{
{
name: "nil existing replicas",
existingReplicas: nil,
newReplicas: ptr.To(int32(2)),
expected: ptr.To(int32(2)),
},
{
name: "both nil",
existingReplicas: nil,
newReplicas: nil,
expected: nil,
},
{
name: "existing replicas not nil, new replicas not nil",
existingReplicas: ptr.To(int32(2)),
newReplicas: ptr.To(int32(1)),
expected: ptr.To(int32(2)),
},
{
name: "existing replicas not nil, new replicas nil",
existingReplicas: ptr.To(int32(2)),
newReplicas: nil,
expected: ptr.To(int32(2)),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
existingMvtxSpec := fakeMonoVertexSpec(t)
existingMvtxSpec.Replicas = tt.existingReplicas
existingGenericMvtx := fakeGenericMonoVertex(t, existingMvtxSpec)

newMvtxSpec := fakeMonoVertexSpec(t)
newMvtxSpec.Replicas = tt.newReplicas
newGenericMvtx := fakeGenericMonoVertex(t, newMvtxSpec)

result, err := withExistingMvtxReplicas(existingGenericMvtx, newGenericMvtx)
assert.NoError(t, err)

unstruc, err := kubernetes.ObjectToUnstructured(result)
assert.NoError(t, err)

expected, existing, err := unstructured.NestedFloat64(unstruc.Object, "spec", "replicas")
assert.NoError(t, err)
assert.Equal(t, tt.expected != nil, existing)
if tt.expected != nil {
assert.Equal(t, *tt.expected, int32(expected))
}
})
}

}

0 comments on commit fdc7a6b

Please sign in to comment.