diff --git a/server/consumer.go b/server/consumer.go index ca52184c92e..846cd78829b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5323,18 +5323,19 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { // ignoreInterest marks whether the consumer should be ignored when determining interest. // No lock held on entry. func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) { - state := mset.state() - stop := state.LastSeq o.mu.Lock() if !o.isLeader() { - o.readStoredState(stop) + o.readStoredState(0) } start := o.asflr o.mu.Unlock() + // Make sure we start at worst with first sequence in the stream. + state := mset.state() if start < state.FirstSeq { start = state.FirstSeq } + stop := state.LastSeq // Consumer's interests are ignored by default. If we should not ignore interest, unset. co := o @@ -5343,13 +5344,37 @@ func (o *consumer) cleanupNoInterestMessages(mset *stream, ignoreInterest bool) } var rmseqs []uint64 - mset.mu.Lock() + mset.mu.RLock() + + // If over this amount of messages to check, defer to checkInterestState() which + // will do the right thing since we are now removed. + // TODO(dlc) - Better way? + const bailThresh = 100_000 + + // Check if we would be spending too much time here and defer to separate go routine. + if len(mset.consumers) == 0 { + mset.mu.RUnlock() + mset.mu.Lock() + defer mset.mu.Unlock() + mset.store.Purge() + var state StreamState + mset.store.FastState(&state) + mset.lseq = state.LastSeq + // Also make sure we clear any pending acks. + mset.clearAllPreAcksBelowFloor(state.FirstSeq) + return + } else if stop-start > bailThresh { + mset.mu.RUnlock() + go mset.checkInterestState() + return + } + for seq := start; seq <= stop; seq++ { if mset.noInterest(seq, co) { rmseqs = append(rmseqs, seq) } } - mset.mu.Unlock() + mset.mu.RUnlock() // These can be removed. for _, seq := range rmseqs { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index d52b5dc3ed3..b759e557078 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6099,6 +6099,310 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) { } } +func TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Make trhe first sequence high. We already protect against it but for extra sanity. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000}) + require_NoError(t, err) + + // Create 2 consumers. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C1", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C2", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 100) + + sub, err := js.PullSubscribe("foo.bar", "C1") + require_NoError(t, err) + msgs, err := sub.Fetch(50) + require_NoError(t, err) + require_Equal(t, len(msgs), 50) + for _, m := range msgs { + m.AckSync() + } + + // Now delete second one and make sure accounting correct. + err = js.DeleteConsumer("TEST", "C2") + require_NoError(t, err) + time.Sleep(100 * time.Millisecond) + + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 50) +} + +func TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Make trhe first sequence high. We already protect against it but for extra sanity. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000}) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C1", + AckPolicy: nats.AckNonePolicy, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 100) + + sub, err := js.PullSubscribe("foo.bar", "C1") + require_NoError(t, err) + msgs, err := sub.Fetch(100) + require_NoError(t, err) + require_Equal(t, len(msgs), 100) + for _, m := range msgs { + m.AckSync() + } + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 0) +} + +func TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Make trhe first sequence high. We already protect against it but for extra sanity. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000}) + require_NoError(t, err) + + // Create 2 consumers. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C1", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C2", + AckPolicy: nats.AckNonePolicy, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 100) + + sub, err := js.PullSubscribe("foo.bar", "C1") + require_NoError(t, err) + msgs, err := sub.Fetch(100) + require_NoError(t, err) + require_Equal(t, len(msgs), 100) + for _, m := range msgs { + m.AckSync() + } + // AckNone will hold. + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 100) + + // Now delete second one and make sure accounting correct. + err = js.DeleteConsumer("TEST", "C2") + require_NoError(t, err) + time.Sleep(100 * time.Millisecond) + + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 0) +} + +func TestJetStreamClusterConsumerDeleteInterestPolicyPerf(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Retention: nats.InterestPolicy, + Subjects: []string{"foo.*"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Make trhe first sequence high. We already protect against it but for extra sanity. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 100_000_000}) + require_NoError(t, err) + + // Create 3 consumers. 1 Ack explicit, 1 AckAll and 1 AckNone + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C1", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C2", + AckPolicy: nats.AckAllPolicy, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C3", + AckPolicy: nats.AckNonePolicy, + }) + require_NoError(t, err) + + for i := 0; i < 500_000; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // For C1 grab 100 and ack evens. + sub, err := js.PullSubscribe("foo.bar", "C1") + require_NoError(t, err) + msgs, err := sub.Fetch(100) + require_NoError(t, err) + require_Equal(t, len(msgs), 100) + for _, m := range msgs { + meta, _ := m.Metadata() + if meta.Sequence.Stream%2 == 0 { + m.AckSync() + } + } + + // For C2 grab 500 and ack 100. + sub, err = js.PullSubscribe("foo.bar", "C2") + require_NoError(t, err) + msgs, err = sub.Fetch(500, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 500) + msgs[99].AckSync() + + // Simulate stream viewer, get first 10 from C3 + sub, err = js.PullSubscribe("foo.bar", "C3") + require_NoError(t, err) + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + time.Sleep(500 * time.Millisecond) + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 499_995) + + // Before fix this was in the seconds. All the while the stream is locked. + // This should be short now. + start := time.Now() + err = js.DeleteConsumer("TEST", "C3") + require_NoError(t, err) + if elapsed := time.Since(start); elapsed > 50*time.Millisecond { + t.Fatalf("Deleting AckNone consumer took too long: %v", elapsed) + } + + time.Sleep(500 * time.Millisecond) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 499_950) + + // Now do AckAll + start = time.Now() + err = js.DeleteConsumer("TEST", "C2") + require_NoError(t, err) + if elapsed := time.Since(start); elapsed > 50*time.Millisecond { + t.Fatalf("Deleting AckAll consumer took too long: %v", elapsed) + } + + time.Sleep(500 * time.Millisecond) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 499_950) + + // Now do AckExplicit + start = time.Now() + err = js.DeleteConsumer("TEST", "C1") + require_NoError(t, err) + if elapsed := time.Since(start); elapsed > 50*time.Millisecond { + t.Fatalf("Deleting AckExplicit consumer took too long: %v", elapsed) + } + + time.Sleep(500 * time.Millisecond) + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 0) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.