Skip to content

Commit

Permalink
Make sure ack processing is consistent and correct between leader and…
Browse files Browse the repository at this point in the history
… followers.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Jun 12, 2024
1 parent 76dbf3c commit 8626024
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
11 changes: 5 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2855,8 +2855,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
delete(o.pending, sseq)
// Use the original deliver sequence from our pending record.
dseq = p.Sequence

// Only move floors if we matched an existing pending.
if dseq == o.adflr+1 {
if len(o.pending) == 0 {
o.adflr = o.dseq - 1
o.asflr = o.sseq - 1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
Expand All @@ -2867,11 +2871,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
}
}
// If nothing left set consumer to current delivered.
// Do not update stream.
if len(o.pending) == 0 {
o.adflr = o.dseq - 1
}
}
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
Expand Down
48 changes: 48 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2281,3 +2281,51 @@ func TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe(t *testing.T) {
}
}
}

func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo.*", "consumer")
require_NoError(t, err)

// Do 25 rounds.
for i := 1; i <= 25; i++ {
// Send 50 msgs.
for x := 0; x < 50; x++ {
_, err := js.Publish("foo.bar", nil)
require_NoError(t, err)
}
msgs, err := sub.Fetch(50, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 50)
// Randomize
rand.Shuffle(len(msgs), func(i, j int) { msgs[i], msgs[j] = msgs[j], msgs[i] })
for _, m := range msgs {
m.AckSync()
}

time.Sleep(100 * time.Millisecond)
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
consumer := mset.lookupConsumer("consumer")
require_NotEqual(t, consumer, nil)
info := consumer.info()
require_Equal(t, info.NumAckPending, 0)
require_Equal(t, info.AckFloor.Consumer, uint64(i*50))
require_Equal(t, info.AckFloor.Stream, uint64(i*50))
}
}
}

0 comments on commit 8626024

Please sign in to comment.