Skip to content

Commit

Permalink
KAFKA-17600: Add nextOffsets to the ConsumerRecords (apache#17414)
Browse files Browse the repository at this point in the history
This PR implements KIP-1094.

Reviewers: Andrew Schofield <[email protected]>, Kirk True <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
aliehsaeedii authored Oct 23, 2024
1 parent 9d65ff8 commit 14a098b
Show file tree
Hide file tree
Showing 22 changed files with 340 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Map.of(), Map.of());

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;

@Deprecated
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this(records, Map.of());
}

public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
this.records = records;
this.nextOffsets = Map.copyOf(nextOffsets);
}

/**
Expand All @@ -53,6 +60,14 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
return Collections.unmodifiableList(recs);
}

/**
* Get the next offsets and metadata corresponding to all topic partitions for which the position have been advanced in this poll call
* @return the next offsets that the consumer will consume
*/
public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
return nextOffsets;
}

/**
* Get just the records for the given topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {

// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>();
final List<TopicPartition> toClear = new ArrayList<>();

for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
Expand All @@ -253,14 +254,15 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch);
subscriptions.position(entry.getKey(), newPosition);
nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
}
}
toClear.add(entry.getKey());
}
}

toClear.forEach(records::remove);
return new ConsumerRecords<>(results);
return new ConsumerRecords<>(results, nextOffsetAndMetadata);
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
}

records.clear();
return new ConsumerRecords<>(results);
return new ConsumerRecords<>(results, Map.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
+ "since the consumer's position has advanced for at least one topic partition");
}

return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
return interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
}
// We will wait for retryBackoffMs
} while (timer.notExpired());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ private ConsumerRecords<K, V> poll(final Timer timer) {
+ "since the consumer's position has advanced for at least one topic partition");
}

return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
}
} while (timer.notExpired());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
Expand All @@ -33,30 +34,35 @@ public class Fetch<K, V> {
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private boolean positionAdvanced;
private int numRecords;
private Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;

public static <K, V> Fetch<K, V> empty() {
return new Fetch<>(new HashMap<>(), false, 0);
return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
}

public static <K, V> Fetch<K, V> forPartition(
TopicPartition partition,
List<ConsumerRecord<K, V>> records,
boolean positionAdvanced
boolean positionAdvanced,
OffsetAndMetadata nextOffsetAndMetadata
) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = records.isEmpty()
? new HashMap<>()
: mkMap(mkEntry(partition, records));
return new Fetch<>(recordsMap, positionAdvanced, records.size());
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap = mkMap(mkEntry(partition, nextOffsetAndMetadata));
return new Fetch<>(recordsMap, positionAdvanced, records.size(), nextOffsetAndMetadataMap);
}

private Fetch(
Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
boolean positionAdvanced,
int numRecords
int numRecords,
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata
) {
this.records = records;
this.positionAdvanced = positionAdvanced;
this.numRecords = numRecords;
this.nextOffsetAndMetadata = nextOffsetAndMetadata;
}

/**
Expand All @@ -70,6 +76,7 @@ public void add(Fetch<K, V> fetch) {
Objects.requireNonNull(fetch);
addRecords(fetch.records);
this.positionAdvanced |= fetch.positionAdvanced;
this.nextOffsetAndMetadata.putAll(fetch.nextOffsetAndMetadata);
}

/**
Expand All @@ -95,6 +102,13 @@ public int numRecords() {
return numRecords;
}

/**
* @return the next offsets and metadata that the consumer will consume (last epoch is included)
*/
public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
return Map.copyOf(nextOffsetAndMetadata);
}

/**
* @return {@code true} if and only if this fetch did not return any user-visible (i.e., non-control) records, and
* did not cause the consumer position to advance for any topic partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -195,7 +196,7 @@ private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRe
metricsManager.recordPartitionLead(tp, lead);
}

return Fetch.forPartition(tp, partRecords, positionAdvanced);
return Fetch.forPartition(tp, partRecords, positionAdvanced, new OffsetAndMetadata(nextInLineFetch.nextFetchOffset(), nextInLineFetch.lastEpoch(), ""));
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
final ShareFetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
currentFetch = fetch;
return new ConsumerRecords<>(fetch.records());
return new ConsumerRecords<>(fetch.records(), Map.of());
}

metadata.maybeThrowAnyException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -81,6 +82,7 @@ public void testRecordsByPartition() {

ConsumerRecords<Integer, String> consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);

assertEquals(partitionSize * topics.size(), consumerRecords.nextOffsets().size());
for (String topic : topics) {
for (int partition = 0; partition < partitionSize; partition++) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
Expand All @@ -90,6 +92,8 @@ public void testRecordsByPartition() {
assertTrue(records.isEmpty());
} else {
assertEquals(recordSize, records.size());
final ConsumerRecord<Integer, String> lastRecord = records.get(recordSize - 1);
assertEquals(new OffsetAndMetadata(lastRecord.offset() + 1, lastRecord.leaderEpoch(), ""), consumerRecords.nextOffsets().get(topicPartition));
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<Integer, String> record = records.get(i);
validateRecordPayload(topic, record, partition, i, recordSize);
Expand Down Expand Up @@ -117,6 +121,8 @@ public void testRecordsByTopic() {

ConsumerRecords<Integer, String> consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);

assertEquals(partitionSize * topics.size(), consumerRecords.nextOffsets().size());

for (String topic : topics) {
Iterable<ConsumerRecord<Integer, String>> records = consumerRecords.records(topic);
int recordCount = 0;
Expand Down Expand Up @@ -156,6 +162,7 @@ public void testRecordsAreImmutable() {
ConsumerRecords<Integer, String> records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic));
ConsumerRecords<Integer, String> emptyRecords = ConsumerRecords.empty();

assertEquals(partitionSize, records.nextOffsets().size());
// check records(TopicPartition) / partitions by add method
// check iterator / records(String) by remove method
// check data count after all operations
Expand All @@ -178,6 +185,7 @@ private ConsumerRecords<Integer, String> buildTopicTestRecords(int recordSize,
int emptyPartitionIndex,
Collection<String> topics) {
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> partitionToRecords = new LinkedHashMap<>();
Map<TopicPartition, OffsetAndMetadata> nextOffsets = new HashMap<>();
for (String topic : topics) {
for (int i = 0; i < partitionSize; i++) {
List<ConsumerRecord<Integer, String>> records = new ArrayList<>(recordSize);
Expand All @@ -189,11 +197,13 @@ private ConsumerRecords<Integer, String> buildTopicTestRecords(int recordSize,
);
}
}
partitionToRecords.put(new TopicPartition(topic, i), records);
final TopicPartition tp = new TopicPartition(topic, i);
partitionToRecords.put(tp, records);
nextOffsets.put(tp, new OffsetAndMetadata(recordSize, Optional.empty(), ""));
}
}

return new ConsumerRecords<>(partitionToRecords);
return new ConsumerRecords<>(partitionToRecords, nextOffsets);
}

private void validateEmptyPartition(ConsumerRecord<Integer, String> record, int emptyPartitionIndex) {
Expand Down
Loading

0 comments on commit 14a098b

Please sign in to comment.