Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Consumer delete time on interest policy streams with a large number of messages. #5547

Merged
merged 1 commit into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5323,18 +5323,19 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
// ignoreInterest marks whether the consumer should be ignored when determining interest.
// No lock held on entry.
func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) {
state := mset.state()
stop := state.LastSeq
o.mu.Lock()
if !o.isLeader() {
o.readStoredState(stop)
o.readStoredState(0)
}
start := o.asflr
o.mu.Unlock()

// Make sure we start at worst with first sequence in the stream.
state := mset.state()
if start < state.FirstSeq {
start = state.FirstSeq
}
stop := state.LastSeq

// Consumer's interests are ignored by default. If we should not ignore interest, unset.
co := o
Expand All @@ -5343,13 +5344,37 @@ func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool)
}

var rmseqs []uint64
mset.mu.Lock()
mset.mu.RLock()

// If over this amount of messages to check, defer to checkInterestState() which
// will do the right thing since we are now removed.
// TODO(dlc) - Better way?
const bailThresh = 100_000

// Check if we would be spending too much time here and defer to separate go routine.
if len(mset.consumers) == 0 {
mset.mu.RUnlock()
mset.mu.Lock()
defer mset.mu.Unlock()
mset.store.Purge()
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
// Also make sure we clear any pending acks.
mset.clearAllPreAcksBelowFloor(state.FirstSeq)
return
} else if stop-start > bailThresh {
mset.mu.RUnlock()
go mset.checkInterestState()
return
}

for seq := start; seq <= stop; seq++ {
if mset.noInterest(seq, co) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.Unlock()
mset.mu.RUnlock()

// These can be removed.
for _, seq := range rmseqs {
Expand Down
304 changes: 304 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6099,6 +6099,310 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {
}
}

func TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 2 consumers.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(50)
require_NoError(t, err)
require_Equal(t, len(msgs), 50)
for _, m := range msgs {
m.AckSync()
}

// Now delete second one and make sure accounting correct.
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
time.Sleep(100 * time.Millisecond)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 50)
}

func TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
m.AckSync()
}
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

func TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 2 consumers.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
m.AckSync()
}
// AckNone will hold.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 100)

// Now delete second one and make sure accounting correct.
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
time.Sleep(100 * time.Millisecond)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.InterestPolicy,
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Make trhe first sequence high. We already protect against it but for extra sanity.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000})
require_NoError(t, err)

// Create 3 consumers. 1 Ack explicit, 1 AckAll and 1 AckNone
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C1",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C2",
AckPolicy: nats.AckAllPolicy,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "C3",
AckPolicy: nats.AckNonePolicy,
})
require_NoError(t, err)

for i := 0; i < 500_000; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// For C1 grab 100 and ack evens.
sub, err := js.PullSubscribe("foo.bar", "C1")
require_NoError(t, err)
msgs, err := sub.Fetch(100)
require_NoError(t, err)
require_Equal(t, len(msgs), 100)
for _, m := range msgs {
meta, _ := m.Metadata()
if meta.Sequence.Stream%2 == 0 {
m.AckSync()
}
}

// For C2 grab 500 and ack 100.
sub, err = js.PullSubscribe("foo.bar", "C2")
require_NoError(t, err)
msgs, err = sub.Fetch(500, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 500)
msgs[99].AckSync()

// Simulate stream viewer, get first 10 from C3
sub, err = js.PullSubscribe("foo.bar", "C3")
require_NoError(t, err)
msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

time.Sleep(500 * time.Millisecond)
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_995)

// Before fix this was in the seconds. All the while the stream is locked.
// This should be short now.
start := time.Now()
err = js.DeleteConsumer("TEST", "C3")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckNone consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_950)

// Now do AckAll
start = time.Now()
err = js.DeleteConsumer("TEST", "C2")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckAll consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 499_950)

// Now do AckExplicit
start = time.Now()
err = js.DeleteConsumer("TEST", "C1")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
t.Fatalf("Deleting AckExplicit consumer took too long: %v", elapsed)
}

time.Sleep(500 * time.Millisecond)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down