Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18452: Implemented batch size in acquired records #18459

Merged
merged 11 commits into from
Jan 15, 2025
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.batchSize(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and we want to retry
// replica manager fetch. Depends on the share partition manager implementation,
Expand Down
93 changes: 72 additions & 21 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ public long nextFetchOffset() {
* fetched from the leader.
*
* @param memberId The member id of the client that is fetching the record.
* @param batchSize The number of records per acquired records batch.
* @param maxFetchRecords The maximum number of records that should be acquired, this is a soft
* limit and the method might acquire more records than the maxFetchRecords,
* if the records are already part of the same fetch batch.
Expand All @@ -589,6 +590,7 @@ public long nextFetchOffset() {
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description of param batchSize is missing in the javadoc for this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc for this new param is still missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, done.

int maxFetchRecords,
FetchPartitionData fetchPartitionData
) {
Expand Down Expand Up @@ -627,9 +629,8 @@ public ShareAcquiredRecords acquire(
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}",
groupId, topicIdPartition);
AcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(), maxFetchRecords);
return ShareAcquiredRecords.fromAcquiredRecords(acquiredRecords);
return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. In shareFetchResponse, we always set records to all fetch batches. However, some of the batches could be excluded during acquisition. So, it's wasteful to return excluded batches in shareFetchResponse.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created: https://issues.apache.org/jira/browse/KAFKA-18522. I ll adjust the fetch records batches in response accordingly.

firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords);
}

log.trace("Overlap exists with in-flight records. Acquire the records if available for"
Expand Down Expand Up @@ -708,11 +709,11 @@ public ShareAcquiredRecords acquire(
// missing records as well.
if (acquiredCount < maxFetchRecords && subMap.lastEntry().getValue().lastOffset() < lastBatch.lastOffset()) {
log.trace("There exists another batch which needs to be acquired as well");
AcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
subMap.lastEntry().getValue().lastOffset() + 1,
lastBatch.lastOffset(), maxFetchRecords - acquiredCount);
result.add(acquiredRecords);
acquiredCount += (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount);
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
return new ShareAcquiredRecords(result, acquiredCount);
} finally {
Expand Down Expand Up @@ -1176,11 +1177,12 @@ private boolean initializedOrThrowException() {
};
}

private AcquiredRecords acquireNewBatchRecords(
private ShareAcquiredRecords acquireNewBatchRecords(
String memberId,
Iterable<? extends RecordBatch> batches,
long firstOffset,
long lastOffset,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, maxFetchRecords is the maxPollRecords. What benefits do we get by setting batchSize smaller than maxFetchRecords? To allow acks to be sent quicker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And not splitting batches on broker, as partial acknowledgement for a large batch results in offset based tracking for InflightBatch, so alligning them as per max.poll.records.

int maxFetchRecords
) {
lock.writeLock().lock();
Expand All @@ -1204,25 +1206,74 @@ private AcquiredRecords acquireNewBatchRecords(
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
}

// Schedule acquisition lock timeout for the batch.
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstAcquiredOffset, lastAcquiredOffset);
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(firstAcquiredOffset, new InFlightBatch(
memberId,
firstAcquiredOffset,
lastAcquiredOffset,
RecordState.ACQUIRED,
1,
timerTask));
// Create batches of acquired records.
List<AcquiredRecords> acquiredRecords = createBatches(memberId, batches, firstAcquiredOffset, lastAcquiredOffset, batchSize);
// if the cachedState was empty before acquiring the new batches then startOffset needs to be updated
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
endOffset = lastAcquiredOffset;
return new AcquiredRecords()
.setFirstOffset(firstAcquiredOffset)
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
}
}

private List<AcquiredRecords> createBatches(
String memberId,
Iterable<? extends RecordBatch> batches,
long firstAcquiredOffset,
long lastAcquiredOffset,
int batchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. Will it be better for BatchSize to be a group level config instead of in each share fetch request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this. I would like to assess the efficacy of the approach of having the batch size information known on client and broker, and then we can decide on the best way to control it. In practice, almost all of the broker code would be the same whether it's a config or part of the request, so not too difficult to switch if we want later.

) {
lock.writeLock().lock();
try {
List<AcquiredRecords> result = new ArrayList<>();
long currentFirstOffset = firstAcquiredOffset;
// No split of batches is required if the batch size is greater than records which
// can be acquired, else split the batch into multiple batches.
if (lastAcquiredOffset - firstAcquiredOffset + 1 > batchSize) {
// The batch is split into multiple batches considering batch size.
// Note: Try reading only the baseOffset of the batch and avoid reading the lastOffset
// as lastOffset call of RecordBatch is expensive (loads headers).
for (RecordBatch batch : batches) {
long batchBaseOffset = batch.baseOffset();
// Check if the batch is already past the last acquired offset then break.
if (batchBaseOffset > lastAcquiredOffset) {
// Break the loop and the last batch will be processed outside the loop.
break;
}

// Create new batch once the batch size is reached.
if (batchBaseOffset - currentFirstOffset >= batchSize) {
result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(batchBaseOffset - 1)
.setDeliveryCount((short) 1));
currentFirstOffset = batchBaseOffset;
}
}
}
// Add the last batch or the only batch if the batch size is greater than the records which
// can be acquired.
result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1);
.setDeliveryCount((short) 1));

result.forEach(acquiredRecords -> {
// Schedule acquisition lock timeout for the batch.
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset());
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(acquiredRecords.firstOffset(), new InFlightBatch(
memberId,
acquiredRecords.firstOffset(),
acquiredRecords.lastOffset(),
RecordState.ACQUIRED,
1,
timerTask));
});
return result;
} finally {
lock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private SharePartitionManager(
* @param groupId The group id, this is used to identify the share group.
* @param memberId The member id, generated by the group-coordinator, this is used to identify the client.
* @param fetchParams The fetch parameters from the share fetch request.
* @param batchSize The number of records per acquired records batch.
* @param partitionMaxBytes The maximum number of bytes to fetch for each partition.
*
* @return A future that will be completed with the fetched messages.
Expand All @@ -244,13 +245,14 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
String groupId,
String memberId,
FetchParams fetchParams,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc for the new param

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, done.

Map<TopicIdPartition, Integer> partitionMaxBytes
) {
log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}",
partitionMaxBytes.keySet(), groupId, fetchParams);

CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, maxFetchRecords));
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords));

return future;
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3306,6 +3306,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupId,
shareFetchRequest.data.memberId,
params,
shareFetchRequest.data.batchSize,
interestedWithMaxBytes
).thenApply{ result =>
val combinedResult = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
Expand Down
Loading
Loading