Skip to content

Commit

Permalink
[FIXED] Stream create response test for sources being present in the …
Browse files Browse the repository at this point in the history
…response (#1420)

Signed-off-by: Jean-Noël Moyne <[email protected]>
Signed-off-by: Piotr Piotrowski <[email protected]>
Co-authored-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
jnmoyne and piotrpio authored Sep 27, 2023
1 parent a6bb6fa commit 4d43218
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 26 deletions.
4 changes: 2 additions & 2 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
Expand Down Expand Up @@ -443,7 +443,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
Expand Down
54 changes: 42 additions & 12 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,8 +1664,8 @@ func TestStreamConfigMatches(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
cfgSource := jetstream.StreamConfig{
Name: "source",
cfg := jetstream.StreamConfig{
Name: "stream",
Description: "desc",
Subjects: []string{"foo.*"},
Retention: jetstream.WorkQueuePolicy,
Expand Down Expand Up @@ -1703,15 +1703,15 @@ func TestStreamConfigMatches(t *testing.T) {
},
}

s, err := js.CreateStream(context.Background(), cfgSource)
s, err := js.CreateStream(context.Background(), cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.CachedInfo().Config, cfgSource) {
t.Fatalf("StreamConfig doesn't match")
if !reflect.DeepEqual(s.CachedInfo().Config, cfg) {
t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config)
}

cfg := jetstream.StreamConfig{
cfgMirror := jetstream.StreamConfig{
Name: "mirror",
MaxConsumers: 10,
MaxMsgs: 100,
Expand All @@ -1720,8 +1720,9 @@ func TestStreamConfigMatches(t *testing.T) {
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Duplicates: 10 * time.Second,
Mirror: &jetstream.StreamSource{
Name: "source",
Name: "stream",
OptStartSeq: 10,
SubjectTransforms: []jetstream.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
Expand All @@ -1731,12 +1732,43 @@ func TestStreamConfigMatches(t *testing.T) {
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.CreateStream(context.Background(), cfg)
s, err = js.CreateStream(context.Background(), cfgMirror)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.CachedInfo().Config, cfg) {
t.Fatalf("StreamConfig doesn't match")
if !reflect.DeepEqual(s.CachedInfo().Config, cfgMirror) {
t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config)
}

cfgSourcing := jetstream.StreamConfig{
Name: "sourcing",
Subjects: []string{"BAR"},
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Duplicates: 10 * time.Second,
Sources: []*jetstream.StreamSource{
{
Name: "stream",
OptStartSeq: 10,
SubjectTransforms: []jetstream.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
},
},
},
SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.CreateStream(context.Background(), cfgSourcing)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.CachedInfo().Config, cfgSourcing) {
t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config)
}
}

Expand Down Expand Up @@ -1792,8 +1824,6 @@ func TestConsumerConfigMatches(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(c.CachedInfo().Config, cfg) {
fmt.Printf("%#v\n", c.CachedInfo().Config)
fmt.Printf("%#v\n", cfg)
t.Fatalf("ConsumerConfig doesn't match")
}
}
4 changes: 2 additions & 2 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
Expand Down
52 changes: 42 additions & 10 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2799,8 +2799,8 @@ func TestStreamConfigMatches(t *testing.T) {
nc, js := jsClient(t, srv)
defer nc.Close()

cfgSource := nats.StreamConfig{
Name: "source",
cfg := nats.StreamConfig{
Name: "stream",
Description: "desc",
Subjects: []string{"foo.*"},
Retention: nats.WorkQueuePolicy,
Expand Down Expand Up @@ -2838,15 +2838,15 @@ func TestStreamConfigMatches(t *testing.T) {
},
}

s, err := js.AddStream(&cfgSource)
s, err := js.AddStream(&cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.Config, cfgSource) {
t.Fatalf("StreamConfig doesn't match")
if !reflect.DeepEqual(s.Config, cfg) {
t.Fatalf("StreamConfig doesn't match: %#v", s.Config)
}

cfg := nats.StreamConfig{
cfgMirror := nats.StreamConfig{
Name: "mirror",
MaxConsumers: 10,
MaxMsgs: 100,
Expand All @@ -2855,8 +2855,9 @@ func TestStreamConfigMatches(t *testing.T) {
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Duplicates: 10 * time.Second,
Mirror: &nats.StreamSource{
Name: "source",
Name: "stream",
OptStartSeq: 10,
SubjectTransforms: []nats.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
Expand All @@ -2866,12 +2867,43 @@ func TestStreamConfigMatches(t *testing.T) {
SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.AddStream(&cfg)
s, err = js.AddStream(&cfgMirror)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.Config, cfg) {
t.Fatalf("StreamConfig doesn't match")
if !reflect.DeepEqual(s.Config, cfgMirror) {
t.Fatalf("StreamConfig doesn't match: %#v", s.Config)
}

cfgSourcing := nats.StreamConfig{
Name: "sourcing",
Subjects: []string{"BAR"},
MaxConsumers: 10,
MaxMsgs: 100,
MaxBytes: 1000,
MaxAge: 100 * time.Second,
MaxMsgsPerSubject: 1000,
MaxMsgSize: 10000,
Replicas: 1,
Duplicates: 10 * time.Second,
Sources: []*nats.StreamSource{
{
Name: "stream",
OptStartSeq: 10,
SubjectTransforms: []nats.SubjectTransformConfig{
{Source: ">", Destination: "transformed.>"},
},
},
},
SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"},
}

s, err = js.AddStream(&cfgSourcing)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(s.Config, cfgSourcing) {
t.Fatalf("StreamConfig doesn't match: %#v", s.Config)
}
}

Expand Down

0 comments on commit 4d43218

Please sign in to comment.