Skip to content

Commit

Permalink
KAFKA-18452: Implemented batch size in acquired records (apache#18459)
Browse files Browse the repository at this point in the history
The PR implements a way to divide acquired batches into batch size as desirable by client.

The BatchSize is the soft limit and should align the batches in response and cached state in broker at the log batch boundaries.

Reviewers:  Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored and askew committed Jan 23, 2025
1 parent a00b73e commit 8b91059
Show file tree
Hide file tree
Showing 11 changed files with 660 additions and 322 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.batchSize(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and we want to retry
// replica manager fetch. Depends on the share partition manager implementation,
Expand Down
93 changes: 72 additions & 21 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ public long nextFetchOffset() {
* fetched from the leader.
*
* @param memberId The member id of the client that is fetching the record.
* @param batchSize The number of records per acquired records batch.
* @param maxFetchRecords The maximum number of records that should be acquired, this is a soft
* limit and the method might acquire more records than the maxFetchRecords,
* if the records are already part of the same fetch batch.
Expand All @@ -589,6 +590,7 @@ public long nextFetchOffset() {
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int batchSize,
int maxFetchRecords,
FetchPartitionData fetchPartitionData
) {
Expand Down Expand Up @@ -627,9 +629,8 @@ public ShareAcquiredRecords acquire(
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}",
groupId, topicIdPartition);
AcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(), maxFetchRecords);
return ShareAcquiredRecords.fromAcquiredRecords(acquiredRecords);
return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords);
}

log.trace("Overlap exists with in-flight records. Acquire the records if available for"
Expand Down Expand Up @@ -708,11 +709,11 @@ public ShareAcquiredRecords acquire(
// missing records as well.
if (acquiredCount < maxFetchRecords && subMap.lastEntry().getValue().lastOffset() < lastBatch.lastOffset()) {
log.trace("There exists another batch which needs to be acquired as well");
AcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
subMap.lastEntry().getValue().lastOffset() + 1,
lastBatch.lastOffset(), maxFetchRecords - acquiredCount);
result.add(acquiredRecords);
acquiredCount += (int) (acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1);
lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount);
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
return new ShareAcquiredRecords(result, acquiredCount);
} finally {
Expand Down Expand Up @@ -1176,11 +1177,12 @@ private boolean initializedOrThrowException() {
};
}

private AcquiredRecords acquireNewBatchRecords(
private ShareAcquiredRecords acquireNewBatchRecords(
String memberId,
Iterable<? extends RecordBatch> batches,
long firstOffset,
long lastOffset,
int batchSize,
int maxFetchRecords
) {
lock.writeLock().lock();
Expand All @@ -1204,25 +1206,74 @@ private AcquiredRecords acquireNewBatchRecords(
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
}

// Schedule acquisition lock timeout for the batch.
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, firstAcquiredOffset, lastAcquiredOffset);
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(firstAcquiredOffset, new InFlightBatch(
memberId,
firstAcquiredOffset,
lastAcquiredOffset,
RecordState.ACQUIRED,
1,
timerTask));
// Create batches of acquired records.
List<AcquiredRecords> acquiredRecords = createBatches(memberId, batches, firstAcquiredOffset, lastAcquiredOffset, batchSize);
// if the cachedState was empty before acquiring the new batches then startOffset needs to be updated
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
endOffset = lastAcquiredOffset;
return new AcquiredRecords()
.setFirstOffset(firstAcquiredOffset)
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
}
}

private List<AcquiredRecords> createBatches(
String memberId,
Iterable<? extends RecordBatch> batches,
long firstAcquiredOffset,
long lastAcquiredOffset,
int batchSize
) {
lock.writeLock().lock();
try {
List<AcquiredRecords> result = new ArrayList<>();
long currentFirstOffset = firstAcquiredOffset;
// No split of batches is required if the batch size is greater than records which
// can be acquired, else split the batch into multiple batches.
if (lastAcquiredOffset - firstAcquiredOffset + 1 > batchSize) {
// The batch is split into multiple batches considering batch size.
// Note: Try reading only the baseOffset of the batch and avoid reading the lastOffset
// as lastOffset call of RecordBatch is expensive (loads headers).
for (RecordBatch batch : batches) {
long batchBaseOffset = batch.baseOffset();
// Check if the batch is already past the last acquired offset then break.
if (batchBaseOffset > lastAcquiredOffset) {
// Break the loop and the last batch will be processed outside the loop.
break;
}

// Create new batch once the batch size is reached.
if (batchBaseOffset - currentFirstOffset >= batchSize) {
result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(batchBaseOffset - 1)
.setDeliveryCount((short) 1));
currentFirstOffset = batchBaseOffset;
}
}
}
// Add the last batch or the only batch if the batch size is greater than the records which
// can be acquired.
result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1);
.setDeliveryCount((short) 1));

result.forEach(acquiredRecords -> {
// Schedule acquisition lock timeout for the batch.
AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset());
// Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch.
cachedState.put(acquiredRecords.firstOffset(), new InFlightBatch(
memberId,
acquiredRecords.firstOffset(),
acquiredRecords.lastOffset(),
RecordState.ACQUIRED,
1,
timerTask));
});
return result;
} finally {
lock.writeLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private SharePartitionManager(
* @param groupId The group id, this is used to identify the share group.
* @param memberId The member id, generated by the group-coordinator, this is used to identify the client.
* @param fetchParams The fetch parameters from the share fetch request.
* @param batchSize The number of records per acquired records batch.
* @param partitionMaxBytes The maximum number of bytes to fetch for each partition.
*
* @return A future that will be completed with the fetched messages.
Expand All @@ -244,13 +245,14 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
String groupId,
String memberId,
FetchParams fetchParams,
int batchSize,
Map<TopicIdPartition, Integer> partitionMaxBytes
) {
log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}",
partitionMaxBytes.keySet(), groupId, fetchParams);

CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, maxFetchRecords));
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords));

return future;
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3039,6 +3039,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupId,
shareFetchRequest.data.memberId,
params,
shareFetchRequest.data.batchSize,
interestedWithMaxBytes
).thenApply{ result =>
val combinedResult = mutable.Map.empty[TopicIdPartition, ShareFetchResponseData.PartitionData]
Expand Down
Loading

0 comments on commit 8b91059

Please sign in to comment.