-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kgo: reduce allocations when processing batches #3
kgo: reduce allocations when processing batches #3
Conversation
94547af
to
64d04d6
Compare
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great! About 50% of allocations in Mimir with ingest storage come from decompressing the bytes. This should make a massive difference.
I left a couple of comments on the pool sizes and how to make them work with the changes in #4
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
9d508f2
to
5505346
Compare
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
5505346
to
8d36414
Compare
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
13c9457
to
47adbd1
Compare
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
ref: #3 (comment) Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only one comment about lz4 out size and a suggestion about copying the data; but otherwise nice work!
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
…tions Signed-off-by: Miguel Ángel Ortuño <[email protected]>
…tions Signed-off-by: Miguel Ángel Ortuño <[email protected]>
…tions Signed-off-by: Miguel Ángel Ortuño <[email protected]>
* fetching: export utilities for decompressing and parsing partition retch responses ### Background In grafana/mimir we are working towards making fetch requests ourselves. The primary reason behind that is that individual requests to the kafka backend are slow, so doing them sequentially per partition becomes the bottleneck in our application. So we want to fetch records in parallel to speed up the consumption. One difficulty I met when issuing `FetchRequest`s ourselves is that parsing the response is non-trivial. That's why I'm proposing to export these functions for downstream projects to use. Alternatively, I can also try contributing the concurrent fetching logic. But I believe that is much more nuanced and with more tradeoffs around fetched bytes and latency. So I wasn't sure whether it's a good fit for a general purpose library. I'm open to discuss this further. ### What this PR does Moves `(*kgo.cursorOffsetNext).processRespPartition` from being a method to being a standalone function - `kgo.processRespPartition`. There were also little changes necessary to make the interface suitable for public use (like removing the `*broker` parameter). ### Side effects To minimize the necessary changes and the API surface of the package I opted to use a single global decompressor for all messages. Previously, there would be one decompressor per client and that decompressor would be passed down to `(*cursorOffsetNext).processRespPartition`. My understanding is that using different pooled readers (lz4, zst, gzip) shouldn't have a negative impact on performance because usage patterns do not affect the behaviour of the reader (for example, a consistent size of decompressed data doesn't make the reader more or less efficient). I have not thoroughly verified or tested this - Let me know if you think that's important. An alternative to this is to also export the `decompressor` along with `newDecompressor()` and the auxiliary types for decompression. * Restore multiline processV0OuterMessage * `*kgo.Records` pooling support Signed-off-by: Miguel Ángel Ortuño <[email protected]> * Merge pull request #1 from grafana/ortuman/reduce-kgo-record-alloc `*kgo.Record` pooling support * fetching: export utilities for decompressing and parsing partition retch responses * Merge pull request #4 from dimitarvdimitrov/dimitar/grafana-master-with-export-partition-parsing-utils fetching: export utilities for decompressing and parsing partition fetch responses * Merge pull request #3 from ortuman/reduce-decompression-buffer-allocations Signed-off-by: Miguel Ángel Ortuño <[email protected]> --------- Signed-off-by: Miguel Ángel Ortuño <[email protected]> Co-authored-by: Dimitar Dimitrov <[email protected]>
This PR introduces various changes in order to reduce GC overhead by decreasing the total number of allocations when the
EnableRecordsPool
option is set.Among the changes introduced, the following are noteworthy:
A dedicated pool is used during message batch decompression, in order to reduce number of allocations and to avoid a potential pool poisoning scenario.
The possibility of reusing the final output buffers derived from the decompression of a batch has been introduced, after invoking
*kgo.(*Record).Reuse
on all the resulting records.Similarly, once
*kgo.(*Record).Reuse
has been invoked on all the resulting records of a batch, the possibility of recycling the intermediate[]kmsg.Record
buffers generated during the batch processing has been introduced.