Skip to content

Commit

Permalink
[FIXED] Make sure jsz reporting of filtered consumers delivered strea…
Browse files Browse the repository at this point in the history
…m sequence is consistent. (#5528)

For a leader, it will skip msgs at the end to eof and remember o.sseq
for the next getNextMsg call.
But this will report stream delivered different for the leader vs the
followers.

Also if a consumer leader processed an ack ahead of delivered it would
sync, but followers would not.
This makes the behavior consistent between leaders and followers.

Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Jun 13, 2024
1 parent 41f959b commit f263d75
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 7 deletions.
19 changes: 13 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,17 +2719,24 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
}
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
// If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store.
isLeader := o.isLeader()
if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) {
state, err := o.store.BorrowState()
if err != nil {
o.mu.Unlock()
return nil
}
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
if !isLeader {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
} else {
// Since we are filtered and we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence.
info.Delivered.Stream = state.Delivered.Stream
}
}

// Adjust active based on non-zero etc. Also make UTC here.
Expand Down
6 changes: 6 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8758,6 +8758,12 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
return nil
}

// Match leader logic on checking if ack is ahead of delivered.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq + 1
}

if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
return ErrStoreMsgNotFound
}
Expand Down
49 changes: 48 additions & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5966,7 +5966,54 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) {
})
}

func TestJetStreamClusterConsumerDeliveredSyncReporting(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.WorkQueuePolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo.bar", "mc")
require_NoError(t, err)

// Make us match first, but not next 10.
_, err = js.Publish("foo.bar", nil)
require_NoError(t, err)
for i := 0; i < 10; i++ {
_, err = js.Publish("foo.baz", nil)
require_NoError(t, err)
}

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Now we want to make sure that jsz reporting will show the same
// state, including delivered, which will have skipped to the end.
// The skip can happen on several factors, but for here we just send
// another pull request which we will let fail.
_, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
require_Error(t, err)

opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
//
6 changes: 6 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,12 @@ func (o *consumerMemStore) UpdateAcks(dseq, sseq uint64) error {
return nil
}

// Match leader logic on checking if ack is ahead of delivered.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq + 1
}

// Check for AckAll here.
if o.cfg.AckPolicy == AckAll {
sgap := sseq - o.state.AckFloor.Stream
Expand Down

0 comments on commit f263d75

Please sign in to comment.