Skip to content

Commit

Permalink
Improve delete time of consumers on interest policy streams with a la…
Browse files Browse the repository at this point in the history
…rge number of messages.

When we delete a consumer we need to simulate acking all messages from the ackfloor to the last sequence in the stream.
If the consumer is new and only consumed and acked and few messages, or none, this could be costly and cause operational pauses for the parent stream.

If the number of sequences we would need to process is obver a threshold we will let the checkInterestState() do the cleanup in a separate Go routine.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Jun 16, 2024
1 parent 59dfbcc commit d183273
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 6 deletions.
35 changes: 30 additions & 5 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5323,18 +5323,19 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
// ignoreInterest marks whether the consumer should be ignored when determining interest.
// No lock held on entry.
func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) {
state := mset.state()
stop := state.LastSeq
o.mu.Lock()
if !o.isLeader() {
o.readStoredState(stop)
o.readStoredState(0)
}
start := o.asflr
o.mu.Unlock()

// Make sure we start at worst with first sequence in the stream.
state := mset.state()
if start < state.FirstSeq {
start = state.FirstSeq
}
stop := state.LastSeq

// Consumer's interests are ignored by default. If we should not ignore interest, unset.
co := o
Expand All @@ -5343,13 +5344,37 @@ func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool)
}

var rmseqs []uint64
mset.mu.Lock()
mset.mu.RLock()

// If over this amount of messages to check, defer to checkInterestState() which
// will do the right thing since we are now removed.
// TODO(dlc) - Better way?
const bailThresh = 100_000

// Check if we would be spending too much time here and defer to separate go routine.
if len(mset.consumers) == 0 {
mset.mu.RUnlock()
mset.mu.Lock()
defer mset.mu.Unlock()
mset.store.Purge()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
// Also make sure we clear any pending acks.
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
return
} else if stop-start > bailThresh {
mset.mu.RUnlock()
go mset.checkInterestState()
return
}

for seq := start; seq <= stop; seq++ {
if mset.noInterest(seq, co) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.Unlock()
mset.mu.RUnlock()

// These can be removed.
for _, seq := range rmseqs {
Expand Down
304 changes: 304 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6099,6 +6099,310 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {
}
}

func TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 2 consumers.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(50)
require_NoError(t, err)
require_Equal(t, len(msgs), 50)
for _, m := range msgs {
m.AckSync()
}

// Now delete second one and make sure accounting correct.
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
time.Sleep(100 * time.Millisecond)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 50)
}

func TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
m.AckSync()
}
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

func TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 2 consumers.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
m.AckSync()
}
// AckNone will hold.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

// Now delete second one and make sure accounting correct.
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
time.Sleep(100 * time.Millisecond)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 3 consumers. 1 Ack explicit, 1 AckAll and 1 AckNone
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckAllPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 500_000; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// For C1 grab 100 and ack evens.
sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
meta, _ := m.Metadata()
if meta.Sequence.Stream%2 == 0 {
m.AckSync()
}
}

// For C2 grab 500 and ack 100.
sub, err = js.PullSubscribe("foo.bar", "C2")
require_NoError(t, err)
msgs, err = sub.Fetch(500, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 500)
msgs[99].AckSync()

// Simulate stream viewer, get first 10 from C3
sub, err = js.PullSubscribe("foo.bar", "C3")
require_NoError(t, err)
msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

time.Sleep(500 * time.Millisecond)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_995)

// Before fix this was in the seconds. All the while the stream is locked.
// This should be short now.
start := time.Now()
err = js.DeleteConsumer("TEST", "C3")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckNone consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_950)

// Now do AckAll
start = time.Now()
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckAll consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_950)

// Now do AckExplicit
start = time.Now()
err = js.DeleteConsumer("TEST", "C1")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckExplicit consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
1 change: 0 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5584,7 +5584,6 @@ func (mset *stream) checkInterestState() {
// So we will just do normal purge.
mset.store.Purge()
}

// Make sure to reset our local lseq.
mset.store.FastState(&state)
mset.lseq = state.LastSeq
Expand Down

0 comments on commit d183273

Please sign in to comment.