Skip to content

Commit

Permalink
fixed waiting for maxDelayRebalance with AckMode.EXACTLY_ONCE during …
Browse files Browse the repository at this point in the history
…rebalancing (reactor#371)

CommitBatch is not used for AckMode.ExactlyOnce, but increasing the number of uncommitted messages in it causes an infinite loop during rebalancing since they never decrease.

Fixes reactor#371.
  • Loading branch information
Alexander committed Jan 21, 2024
1 parent 9b09225 commit e4533df
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -172,7 +172,11 @@ private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
long end = maxDelayRebalance + System.currentTimeMillis();
do {
try {
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline);
log.debug(
"Rebalancing; waiting for {} records in pipeline or awaitingTransaction: {}",
inPipeline,
this.awaitingTransaction.get()
);
Thread.sleep(interval);
commitEvent.runIfRequired(true);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -374,7 +378,10 @@ public void run() {
}

if (!records.isEmpty()) {
this.commitBatch.addUncommitted(records);
// Handled separately using transactional KafkaSender
if (ackMode != AckMode.EXACTLY_ONCE) {
this.commitBatch.addUncommitted(records);
}
r = Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
log.debug("Emitting {} records, requested now {}", records.count(), r);
sink.emitNext(records, ConsumerEventLoop.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Sinks.Many;
Expand All @@ -39,6 +38,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -73,16 +73,13 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
t.printStackTrace();
return null;
}).given(sink).emitError(any(), any());
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());
Set<String> topics = new HashSet<>();
topics.add("test");
Collection<TopicPartition> partitions = new ArrayList<>();
TopicPartition tp = new TopicPartition("test", 0);
partitions.add(tp);
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
record.put(tp, Collections.singletonList(
new ConsumerRecord("test", 0, 0, 0, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, null)));
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
ConsumerRecords records = new ConsumerRecords(record);
CountDownLatch latch = new CountDownLatch(2);
AtomicBoolean paused = new AtomicBoolean();
Expand All @@ -102,6 +99,8 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
paused.set(false);
return null;
}).given(consumer).resume(any());
ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.MANUAL_ACK, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());
loop.onRequest(1);
loop.onRequest(1);
CommittableBatch batch = loop.commitEvent.commitBatch;
Expand All @@ -115,4 +114,69 @@ public void deferredCommitsWithRevoke() throws InterruptedException {
assertThat(batch.deferred).hasSize(0);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void revokePartitionsForExactlyOnce() throws InterruptedException {
AtomicBoolean isPartitionRevokeFinished = new AtomicBoolean();
ReceiverOptions opts = ReceiverOptions.create(
Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "deferredCommitsWithRevoke"))
.maxDelayRebalance(Duration.ofSeconds(30))
.addRevokeListener(p -> isPartitionRevokeFinished.set(true))
.subscription(Collections.singletonList("test"));
Consumer consumer = mock(Consumer.class);
Scheduler scheduler = KafkaSchedulers.newEvent(opts.groupId());
Many sink = mock(Many.class);
willAnswer(inv -> {
Throwable t = inv.getArgument(0);
t.printStackTrace();
return null;
}).given(sink).emitError(any(), any());
Set<String> topics = new HashSet<>();
topics.add("test");
Collection<TopicPartition> partitions = new ArrayList<>();
TopicPartition tp = new TopicPartition("test", 0);
partitions.add(tp);
Map<TopicPartition, List<ConsumerRecord>> record = new HashMap<>();
record.put(tp, Collections.singletonList(new ConsumerRecord("test", 0, 0, null, null)));
ConsumerRecords records = new ConsumerRecords(record);
CountDownLatch latch = new CountDownLatch(2);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
Thread.sleep(10);
latch.countDown();
if (paused.get()) {
return ConsumerRecords.empty();
}
return records;
}).given(consumer).poll(any());
willAnswer(inv -> {
paused.set(true);
return null;
}).given(consumer).pause(any());
willAnswer(inv -> {
paused.set(false);
return null;
}).given(consumer).resume(any());

ConsumerEventLoop loop = new ConsumerEventLoop<>(AckMode.EXACTLY_ONCE, null, opts,
scheduler, consumer, t -> false, sink, new AtomicBoolean());

loop.onRequest(1);
loop.onRequest(1);

CommittableBatch batch = loop.commitEvent.commitBatch;
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(batch.uncommitted).hasSize(0);

loop.awaitingTransaction.set(true);
ArgumentCaptor<ConsumerRebalanceListener> rebal = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
verify(consumer).subscribe(any(Collection.class), rebal.capture());

Executors.newSingleThreadExecutor().execute(() -> rebal.getValue().onPartitionsRevoked(partitions));
await().pollDelay(Duration.ofSeconds(5)).until(() -> true);
assertThat(isPartitionRevokeFinished.get()).isFalse();
loop.awaitingTransaction.set(false);
await().atMost(Duration.ofSeconds(10)).until(isPartitionRevokeFinished::get);
}

}

0 comments on commit e4533df

Please sign in to comment.