diff --git a/server/consumer.go b/server/consumer.go index 7dd2531e298..a3cf3ab424c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2855,8 +2855,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b delete(o.pending, sseq) // Use the original deliver sequence from our pending record. dseq = p.Sequence + // Only move floors if we matched an existing pending. - if dseq == o.adflr+1 { + if len(o.pending) == 0 { + o.adflr = o.dseq - 1 + o.asflr = o.sseq - 1 + } else if dseq == o.adflr+1 { o.adflr, o.asflr = dseq, sseq for ss := sseq + 1; ss < o.sseq; ss++ { if p, ok := o.pending[ss]; ok { @@ -2867,11 +2871,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b } } } - // If nothing left set consumer to current delivered. - // Do not update stream. - if len(o.pending) == 0 { - o.adflr = o.dseq - 1 - } } delete(o.rdc, sseq) o.removeFromRedeliverQueue(sseq) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index b5c9495d900..8407187e99b 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2281,3 +2281,51 @@ func TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe(t *testing.T) { } } } + +func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo.*", "consumer") + require_NoError(t, err) + + // Do 25 rounds. + for i := 1; i <= 25; i++ { + // Send 50 msgs. + for x := 0; x < 50; x++ { + _, err := js.Publish("foo.bar", nil) + require_NoError(t, err) + } + msgs, err := sub.Fetch(50, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 50) + // Randomize + rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] }) + for _, m := range msgs { + m.AckSync() + } + + time.Sleep(100 * time.Millisecond) + for _, s := range c.servers { + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + consumer := mset.lookupConsumer("consumer") + require_NotEqual(t, consumer, nil) + info := consumer.info() + require_Equal(t, info.NumAckPending, 0) + require_Equal(t, info.AckFloor.Consumer, uint64(i*50)) + require_Equal(t, info.AckFloor.Stream, uint64(i*50)) + } + } +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fb7be760e5e..7ea9cd6ab2d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19537,9 +19537,7 @@ func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) { require_NoError(t, err) require_True(t, info.NumAckPending == 0) - // Should be 6 since we do not pull "other". We used to jump ack floor ahead - // but no longer do that. - require_True(t, info.AckFloor.Stream == 6) + require_True(t, info.AckFloor.Stream == 8) require_True(t, info.AckFloor.Consumer == 1) require_True(t, info.NumPending == 0) }