Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18452: Implemented batch size in acquired records #18459

Merged
merged 11 commits into from
Jan 15, 2025

Conversation

apoorvmittal10
Copy link
Collaborator

The PR implements a way to divide acquired batches into batch size as desirable by client.

The BatchSize is the soft limit and should allign the batches in response and cached state in broker at the log batch boundaries i.e.

say produce log batch is from 0-50, 51-100

for batch size from client - 60: then response will be 0-100
for batch size from client - 40: then response will be 0-50, 51-100

This means that broker will not break the batches rather align them in response as per desirable batch size from client so complete batches are acknowledged which shall avoid offset based tracking on broker.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jan 9, 2025
Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a pass of non testing files. A couple of minor comments.

@@ -589,6 +589,7 @@ public long nextFetchOffset() {
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description of param batchSize is missing in the javadoc for this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc for this new param is still missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, done.

.setDeliveryCount((short) 1));
} else {
// The batch is split into multiple batches considering batch size.
long currentFirstOffset = firstAcquiredOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the variable name for currentFirstOffset be previousBatchFirstOffset, sounds more intuitive to me. Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's an improvement, I'm afraid. I find the code easier to follow with the current name.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a pass of test files as well. Left minor comments.

@@ -146,6 +146,7 @@ public class SharePartitionManagerTest {
private static final short MAX_FETCH_RECORDS = 500;
private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
private static final int BATCH_SIZE = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have defined the same BATCH_SIZE = 500 in SharePartitionTest, DelayedShareFetchTest, ShareFetchUtilsTest and ShareFetchTest, can we reuse it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to do that in tests, it could be for individual classes itself.

}

@Test
public void testAcquireWithBatchSizeWithBatchSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test name seems incorrect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, corrected it.

@github-actions github-actions bot removed the triage PRs from the community label Jan 11, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a few comments.

@@ -589,6 +589,7 @@ public long nextFetchOffset() {
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc for this new param is still missing.

result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just break here since the code outside the loop does the same thing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Agreed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agree. I have simplified the if/else statement as well to make common insertion in array.

String memberId,
Iterable<? extends RecordBatch> batches,
long firstOffset,
long lastOffset,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, maxFetchRecords is the maxPollRecords. What benefits do we get by setting batchSize smaller than maxFetchRecords? To allow acks to be sent quicker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And not splitting batches on broker, as partial acknowledgement for a large batch results in offset based tracking for InflightBatch, so alligning them as per max.poll.records.

AcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(), maxFetchRecords);
return ShareAcquiredRecords.fromAcquiredRecords(acquiredRecords);
return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. In shareFetchResponse, we always set records to all fetch batches. However, some of the batches could be excluded during acquisition. So, it's wasteful to return excluded batches in shareFetchResponse.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created: https://issues.apache.org/jira/browse/KAFKA-18522. I ll adjust the fetch records batches in response accordingly.

@@ -244,13 +244,14 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
String groupId,
String memberId,
FetchParams fetchParams,
int batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc for the new param

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, done.

Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)),
25 /* Gap of 3 records will also be added to first batch */);

// Fetch expected records from 3 batches, but change the first expected record to include gap offsets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 4 batches, instead of 3.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, done.

long firstAcquiredOffset,
long lastAcquiredOffset,
String memberId,
int batchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. Will it be better for BatchSize to be a group level config instead of in each share fetch request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this. I would like to assess the efficacy of the approach of having the batch size information known on client and broker, and then we can decide on the best way to control it. In practice, almost all of the broker code would be the same whether it's a config or part of the request, so not too difficult to switch if we want later.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall, it makes sense to me. Just a few minor comments to address.

}

// Track last batch for filling the final batch, if required.
lastBatch = batch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd add a blank line here for legibility.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored this method and lastBatch is not required now.

}
}

result.forEach(record -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable record is really an instance of AcquiredRecords. I suggest renaming it to acquiredRecords because record.firstOffset() and record.lastOffset() doesn't make sense for something is currently named as if it was a single record.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

result.add(new AcquiredRecords()
.setFirstOffset(currentFirstOffset)
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Agreed.

Iterable<? extends RecordBatch> batches,
long firstAcquiredOffset,
long lastAcquiredOffset,
String memberId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Almost always, memberId is the first argument. I suggest making it the first argument here too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.setDeliveryCount((short) 1));
} else {
// The batch is split into multiple batches considering batch size.
long currentFirstOffset = firstAcquiredOffset;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's an improvement, I'm afraid. I find the code easier to follow with the current name.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A few more comments.

@@ -580,6 +580,7 @@ public long nextFetchOffset() {
* fetched from the leader.
*
* @param memberId The member id of the client that is fetching the record.
* @param batchSize The batch size of the individual acquired records batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add that the unit of batch is record?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, also in SharePartitionManager.

// as lastOffset call of RecordBatch is expensive (loads headers).
for (RecordBatch batch : batches) {
long batchBaseOffset = batch.baseOffset();
if (batchBaseOffset < firstAcquiredOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we could eliminate this statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is so right, I have added multiple test to validate skipping in different scenarios.

@Test
public void testAcquireWithBatchSizeAndMultipleBatches() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
// Create 3 batches of records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created 4 batches instead of 3.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, fixed.

}

@Test
public void testAcquireWithBatchSizeAndMaxFetchRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a test that combines multiple record batches into a single acquired inFlightBatch. Should we add one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's being validated in testAcquireWithMultipleBatchesAndMaxFetchRecords where we provide 3 batches and 1 single response by combining first 2 batches arrive in response. Am I missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This sounds good then.

@apoorvmittal10 apoorvmittal10 requested a review from junrao January 14, 2025 19:10
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.

}

@Test
public void testAcquireWithBatchSizeAndMaxFetchRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This sounds good then.

}

@Test
public void testAcquireSingleBatchWithBatchSizeAndEndOffsetAheadBatchFirstOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endOffset is actually after the firstOffset of the first batch. Ditto in the two other tests below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I wanted to test. When endOffset is ahed of first batch offset then do we acquire the batches/offsets correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to have better method name so it's not confusing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Perhaps use larger since ahead can be ambiguous?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. It seems the following tests are quarantined?

  SharePartitionTest > testAcquireSingleBatchWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndMaxFetchRecords()
  SharePartitionTest > testAcquireWithBatchSizeAndMultipleBatches()
  SharePartitionTest > testAcquireBatchSkipWithBatchSizeAndEndOffsetLargerThanFirstBatch()
  SharePartitionTest > testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndSingleBatch()

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 : Thanks for the updated PR. It seems the following tests are quarantined?

  SharePartitionTest > testAcquireSingleBatchWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndMaxFetchRecords()
  SharePartitionTest > testAcquireWithBatchSizeAndMultipleBatches()
  SharePartitionTest > testAcquireBatchSkipWithBatchSizeAndEndOffsetLargerThanFirstBatch()
  SharePartitionTest > testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndSingleBatch()

Ohh strange, looking into.

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 : Thanks for the updated PR. It seems the following tests are quarantined?

  SharePartitionTest > testAcquireSingleBatchWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndMaxFetchRecords()
  SharePartitionTest > testAcquireWithBatchSizeAndMultipleBatches()
  SharePartitionTest > testAcquireBatchSkipWithBatchSizeAndEndOffsetLargerThanFirstBatch()
  SharePartitionTest > testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset()
  SharePartitionTest > testAcquireWithBatchSizeAndSingleBatch()

@junrao I spoke to @mumrah and he mentioned that any new tests will be auto quarantined for 7 days. As these tests are added in this PR hence marked as quarantined.

@apoorvmittal10 apoorvmittal10 requested a review from junrao January 15, 2025 14:00
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the explanation. LGTM

@junrao junrao merged commit 0640017 into apache:trunk Jan 15, 2025
9 checks passed
askew pushed a commit to askew/kafka that referenced this pull request Jan 23, 2025
The PR implements a way to divide acquired batches into batch size as desirable by client.

The BatchSize is the soft limit and should align the batches in response and cached state in broker at the log batch boundaries.

Reviewers:  Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>, Jun Rao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants