Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: upstream concurrent fetchers #9452

Merged

Conversation

dimitarvdimitrov
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov commented Sep 27, 2024

This is the second of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR I'm upstreaming the fetching code. The core of the change is in concurrentFetchers.

I'm submitting the PR, but the work was done jointly by @gotjosh and myself

franz-go fork

We need the changes in this PR twmb/franz-go#803, so for now I'm running a fork

concurrentFetchers Overview

  • Segmentation (fetchWant): The fetcher divides the work into segments called fetchWants. Each fetchWant represents a range of records to be fetched, defined by a start and end offset. This segmentation allows for concurrent fetching of different parts of the topic partition.

  • Concurrent Fetching: Multiple goroutines (defined by the concurrency parameter) work on fetching these segments simultaneously. Each goroutine runs the run method, which processes fetchWants from a channel. The results (fetchResult) of the fetch are send to another channel: fetchWant.result

    • Different configuration of concurrency and size of fetchWant yield different results
      • low concurrency and low number of records per fetch yields lower end-to-end latency
      • high concurrency and high number of records per fetch yields higher replay speed
    • Because of this we chose to have two settings: while we're catching up and after that. These are both controlled via two sets of flags: .startup-fetch-concurrency & .startup-records-per-fetch and .ongoing-fetch-concurrency & .ongoing-records-per-fetch
  • Multiple Attempts: If a fetch attempt fails or doesn't retrieve all requested records, the fetcher will retry. The fetcher updates the start offset of the fetchWant based on the last successfully fetched record and continues until all requested records are retrieved.

  • Overfetching Risk: The system risks overfetching due to limitations in the Kafka protocol:

    • The Kafka protocol doesn't allow specifying the number of records to fetch; only the number of bytes can be specified.
    • This means that a fetch might retrieve more records than intended, especially at the end of one fetchWant.
    • These extra records can overlap with the next fetchWant's range.
    • To handle this, concurrentFetchers tracks the last returned record offset (lastReturnedRecord). We use lastReturnedRecord to remove any records from a fetchResult that may have already been processed.
  • Ordering: The fetcher ensures that segments are processed in order by:

    • Using a linked list (pendingResults) to keep track of fetch results channels in the order they were requested
    • Buffering results in bufferedResult and only sending them to orderedFetches channel when they're next in sequence
    • The PollFetches method, which consumers call to get records, receives from the orderedFetches channel, ensuring records are always returned in the correct order
  • Adaptive Fetching: The system adapts the size of fetch requests based on previous results. It estimates the bytes per record and adjusts the MaxBytes parameter of fetch requests accordingly, trying to optimize the amount of data fetched in each request.

Changes to existing code

Most of this PR introduces new code. However, there are some changes to existing behaviours

  • processNextFetchesUntilTargetOrMaxLagHonored: the second attempt is now allowed more time. The reason for this is that the catchup speed when during stable state-consumption is slower. For example, this means that we can't reduce lag from 15s to 2s within 15s. In those cases leaving 30s is enough to catch up to below 2s lag.

Testing

concurrentFetchers is now the default in unit tests in pkg/storage/ingest. This increases the test coverage significantly and didn't require us to write so many tests.

Future work

  • Backoff and retry getting offset and topic ID when creating concurrentFetchers - this would unblock making concurrent fetchers the default (some e2e tests fail if we can't resolve the topic the first time)
  • dynamic concurrency based on an EWMA; remove the two sets of flags for before and after startup

This is the first of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR I'm upstreaming a tiny change related to partitionOffsetReader. We need caching in the reader so that we can check the start offset of the partition. We don't need that to be very exact because we use it to find out if we're trying to consume from before the start.

Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/concurrent-fetchers branch from a595a91 to 2938fa9 Compare September 27, 2024 13:52
This is the second of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR I'm upstreaming the fetching code. The core of the change is in `concurrentFetchers`.

# `concurrentFetchers` Overview

                              * **Segmentation (fetchWant):** The fetcher divides the work into segments called fetchWants. Each fetchWant represents a range of records to be fetched, defined by a start and end offset. This segmentation allows for concurrent fetching of different parts of the topic partition.

                              * **Concurrent Fetching:** Multiple goroutines (defined by the concurrency parameter) work on fetching these segments simultaneously. Each goroutine runs the `run` method, which processes fetchWants from a channel.

                              * **Fetching Process:** For each fetchWant, the fetcher attempts to retrieve the records through the `fetchSingle` method. This method:
                                 * Finds the leader for the partition
                                 * Builds a fetch request
                                 * Sends the request to the Kafka broker
                                 * Parses the response

                              * **Multiple Attempts:** If a fetch attempt fails or doesn't retrieve all requested records, the fetcher will retry. It uses an error backoff mechanism to avoid overwhelming the system with rapid retries. The fetcher updates the start offset of the fetchWant based on the last successfully fetched record and continues until all requested records are retrieved or the context is cancelled.

                              * **Overfetching Risk:** The system risks overfetching because it might retrieve records that have already been processed. This is handled by:
                                 * Tracking the last returned record offset (`lastReturnedRecord`)
                                 * Using `recordIndexAfterOffset` to find the first new record in each fetch result
                                 * Discarding any duplicate records before passing them to the consumer

                              * **Ordering:** The fetcher ensures that segments are processed in order by:
                                 * Using a linked list (`pendingResults`) to keep track of fetch results in the order they were requested
                                 * Buffering results in `bufferedResult` and only sending them to `orderedFetches` channel when they're next in sequence
                                 * The `PollFetches` method, which consumers call to get records, receives from the `orderedFetches` channel, ensuring records are always returned in the correct order

                              * **Adaptive Fetching:** The system adapts the size of fetch requests based on previous results. It estimates the bytes per record and adjusts the `MaxBytes` parameter of fetch requests accordingly, trying to optimize the amount of data fetched in each request.

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/replay-speed/concurrent-fetchers branch from 2938fa9 to c33dc72 Compare September 27, 2024 13:53
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Copy link
Contributor

@gotjosh gotjosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

My comments are all nits, I don't need to see this again and you can merge after you've decided which ones you want to pick.

pkg/storage/ingest/config.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Show resolved Hide resolved
dimitarvdimitrov and others added 2 commits September 27, 2024 17:24
Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov enabled auto-merge (squash) September 27, 2024 15:25
@dimitarvdimitrov dimitarvdimitrov merged commit f7fad00 into main Sep 27, 2024
29 checks passed
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/replay-speed/concurrent-fetchers branch September 27, 2024 15:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants