Skip to content

Commit

Permalink
M
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman committed Oct 1, 2024
1 parent 95bcb25 commit 9d508f2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
1 change: 0 additions & 1 deletion pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
var out *bytes.Buffer

if pool != nil {
// Assume the worst case scenario here, where decompressed buffer size is FetchMaxBytes.
outBuf := pool.Get(pool.MaxSize())[:0]
defer func() {
pool.Put(outBuf)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ func (cfg *cfg) validate() error {
cfg.hooks = processedHooks

if cfg.recordsPool != nil {
cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, int(cfg.maxBytes.load()), 2, func(sz int) []byte {
// Assume a 2x compression ratio.
maxDecompressBatchSize := int(cfg.maxBytes.load()) * 2

cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressBatchSize, 2, func(sz int) []byte {
return make([]byte, sz)
})
}
Expand Down

0 comments on commit 9d508f2

Please sign in to comment.