Skip to content

Commit

Permalink
Formalize behavior for creating audit streams for reserved subjects. (#…
Browse files Browse the repository at this point in the history
…5548)

For subjects `$JS`, `$JS.API`, `$JSC` and `$SYS` subjects, If NoAck is
true they are now allowed, otherwise they will not be allowed.

This allows proper setup of audit streams for production use cases.

Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored and neilalexander committed Jun 17, 2024
1 parent 4eb41aa commit f5cb845
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
8 changes: 4 additions & 4 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,8 @@ func TestJetStreamConsumerDelete(t *testing.T) {
}

func TestJetStreamConsumerFetchWithDrain(t *testing.T) {
t.Skip()

test := func(t *testing.T, cc *nats.ConsumerConfig) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down Expand Up @@ -1137,8 +1139,6 @@ func TestJetStreamConsumerFetchWithDrain(t *testing.T) {
metadata.NumDelivered, metadata.Sequence.Stream)
}
msgs[int(metadata.Sequence.Stream)] = int(metadata.NumDelivered)

require_NoError(t, err)
return true
}

Expand Down Expand Up @@ -1169,14 +1169,14 @@ func TestJetStreamConsumerFetchWithDrain(t *testing.T) {
test(t, &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 10 * time.Second,
AckWait: 20 * time.Second,
})
})
t.Run("with-backoff", func(t *testing.T) {
test(t, &nats.ConsumerConfig{
Durable: "C",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 10 * time.Second,
AckWait: 20 * time.Second,
BackOff: []time.Duration{25 * time.Millisecond, 100 * time.Millisecond, 250 * time.Millisecond},
})
})
Expand Down
52 changes: 51 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) {
expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"$JS.API.*"}}))

// Events and Advisories etc should be ok.
if _, err := acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"$JS.EVENT.>"}}); err != nil {
if _, err := acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"$JS.EVENT.>"}, NoAck: true}); err != nil {
t.Fatalf("Expected this to work: %v", err)
}
}
Expand Down Expand Up @@ -22606,3 +22606,53 @@ func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloorWithInterestPolicy(t
_, err = js.StreamInfo("TEST", nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
}

// Allow streams with $JS or $SYS prefixes for audit purposes but require no pub ack be set.
func TestJetStreamAuditStreams(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JSC.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$SYS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")))

// These should be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"$JS.>"},
NoAck: true,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"$JSC.>"},
NoAck: true,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST3",
Subjects: []string{"$SYS.>"},
NoAck: true,
})
require_NoError(t, err)
}
12 changes: 8 additions & 4 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,16 +1489,20 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
// and no overlap with any JS API subject space
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
// Make sure the subject is valid. Check this first.
if !IsValidSubject(subj) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("invalid subject"))
}
if _, ok := dset[subj]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected"))
}
// Also check to make sure we do not overlap with our $JS API subjects.
if subjectIsSubsetMatch(subj, "$JS.API.>") {
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))
}
// Make sure the subject is valid.
if !IsValidSubject(subj) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("invalid subject"))
// And the $SYS subjects.
if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))
}
// Mark for duplicate check.
dset[subj] = struct{}{}
Expand Down

0 comments on commit f5cb845

Please sign in to comment.