Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proper zstd decoder pool for binlog event compression handling #17042

Merged
merged 8 commits into from
Oct 22, 2024
Merged
38 changes: 29 additions & 9 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var (
// allocations and GC overhead so this pool allows us to handle
// concurrent cases better while still scaling to 0 when there's no
// usage.
statefulDecoderPool sync.Pool
statefulDecoderPool = &decoderPool{}
)

func init() {
Expand All @@ -98,7 +98,7 @@ func init() {
if err != nil { // Should only happen e.g. due to ENOMEM
log.Errorf("Error creating stateless decoder: %v", err)
}
statefulDecoderPool = sync.Pool{
statefulDecoderPool.pool = sync.Pool{
mattlord marked this conversation as resolved.
Show resolved Hide resolved
New: func() any {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
if err != nil { // Should only happen e.g. due to ENOMEM
Expand Down Expand Up @@ -304,12 +304,9 @@ func (tp *TransactionPayload) decompress() error {
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder := statefulDecoderPool.Get().(*zstd.Decoder)
if streamDecoder == nil {
return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder")
}
if err := streamDecoder.Reset(in); err != nil {
return vterrors.Wrap(err, "error resetting stateful stream decoder")
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
return err
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
Expand Down Expand Up @@ -341,7 +338,7 @@ func (tp *TransactionPayload) Close() {
switch reader := tp.reader.(type) {
case *zstd.Decoder:
if err := reader.Reset(nil); err == nil || err == io.EOF {
readersPool.Put(reader)
statefulDecoderPool.Put(reader)
}
default:
reader = nil
Expand All @@ -368,3 +365,26 @@ func (tp *TransactionPayload) GetNextEvent() (BinlogEvent, error) {
//func (tp *TransactionPayload) Events() iter.Seq[BinlogEvent] {
// return tp.iterator
//}

// decoderPool manages a sync.Pool of *zstd.Decoders.
type decoderPool struct {
pool sync.Pool
}

// Get gets a new *zstd.Decoder.
func (dp *decoderPool) Get(r io.Reader) (*zstd.Decoder, error) {
decoder, ok := dp.pool.Get().(*zstd.Decoder)
if !ok || decoder == nil {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder")
}
if err := decoder.Reset(r); err != nil {
return nil, vterrors.Wrap(err, "error resetting stateful stream decoder")
}
return decoder, nil
}

func (dp *decoderPool) Put(sd *zstd.Decoder) {
if err := sd.Reset(nil); err == nil || err == io.EOF {
dp.pool.Put(sd)
}
}
Loading