diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index 569050be..59c0fe38 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -277,7 +277,6 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo var out *bytes.Buffer if pool != nil { - // Assume the worst case scenario here, where decompressed buffer size is FetchMaxBytes. outBuf := pool.Get(pool.MaxSize())[:0] defer func() { pool.Put(outBuf) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 59e1a3f9..72e5eb04 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -395,7 +395,10 @@ func (cfg *cfg) validate() error { cfg.hooks = processedHooks if cfg.recordsPool != nil { - cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, int(cfg.maxBytes.load()), 2, func(sz int) []byte { + // Assume a 2x compression ratio. + maxDecompressBatchSize := int(cfg.maxBytes.load()) * 2 + + cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressBatchSize, 2, func(sz int) []byte { return make([]byte, sz) }) }