diff --git a/avro/src/main/java/tools/jackson/dataformat/avro/AvroFactory.java b/avro/src/main/java/tools/jackson/dataformat/avro/AvroFactory.java index 1472e215d..500528dc1 100644 --- a/avro/src/main/java/tools/jackson/dataformat/avro/AvroFactory.java +++ b/avro/src/main/java/tools/jackson/dataformat/avro/AvroFactory.java @@ -2,10 +2,14 @@ import java.io.*; +import tools.jackson.dataformat.avro.apacheimpl.AvroRecyclerPools; + import tools.jackson.core.*; import tools.jackson.core.base.BinaryTSFactory; import tools.jackson.core.io.IOContext; +import tools.jackson.core.util.RecyclerPool; import tools.jackson.dataformat.avro.apacheimpl.ApacheAvroParserImpl; +import tools.jackson.dataformat.avro.apacheimpl.ApacheCodecRecycler; import tools.jackson.dataformat.avro.deser.*; /** @@ -38,6 +42,12 @@ public class AvroFactory /********************************************************** */ + /** + * @since 2.16 + */ + protected RecyclerPool _avroRecyclerPool + = AvroRecyclerPools.defaultPool(); + /** * Flag that is set if Apache Avro lib's decoder is to be used for decoding; * `false` to use Jackson native Avro decoder. @@ -234,6 +244,7 @@ protected AvroParser _createParser(ObjectReadContext readCtxt, IOContext ioCtxt, return new ApacheAvroParserImpl(readCtxt, ioCtxt, readCtxt.getStreamReadFeatures(_streamReadFeatures), readCtxt.getFormatReadFeatures(_formatReadFeatures), + _avroRecyclerPool.acquireAndLinkPooled(), (AvroSchema) readCtxt.getSchema(), in); } @@ -253,6 +264,7 @@ protected AvroParser _createParser(ObjectReadContext readCtxt, IOContext ioCtxt, return new ApacheAvroParserImpl(readCtxt, ioCtxt, readCtxt.getStreamReadFeatures(_streamReadFeatures), readCtxt.getFormatReadFeatures(_formatReadFeatures), + _avroRecyclerPool.acquireAndLinkPooled(), (AvroSchema) readCtxt.getSchema(), data, offset, len); } @@ -286,7 +298,8 @@ protected JsonGenerator _createGenerator(ObjectWriteContext writeCtxt, return new AvroGenerator(writeCtxt, ioCtxt, writeCtxt.getStreamWriteFeatures(_streamWriteFeatures), writeCtxt.getFormatWriteFeatures(_formatWriteFeatures), - out, - (AvroSchema) writeCtxt.getSchema()); + _avroRecyclerPool.acquireAndLinkPooled(), + (AvroSchema) writeCtxt.getSchema(), + out); } } diff --git a/avro/src/main/java/tools/jackson/dataformat/avro/AvroGenerator.java b/avro/src/main/java/tools/jackson/dataformat/avro/AvroGenerator.java index ce3d4c300..f2609be62 100644 --- a/avro/src/main/java/tools/jackson/dataformat/avro/AvroGenerator.java +++ b/avro/src/main/java/tools/jackson/dataformat/avro/AvroGenerator.java @@ -104,6 +104,11 @@ private Feature(boolean defaultState) { */ protected final static EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); + /** + * @since 2.16 + */ + protected ApacheCodecRecycler _apacheCodecRecycler; + /** * Bit flag composed of bits that indicate which * {@link AvroGenerator.Feature}s @@ -150,16 +155,18 @@ private Feature(boolean defaultState) { public AvroGenerator(ObjectWriteContext writeCtxt, IOContext ioCtxt, int streamWriteFeatures, int avroFeatures, - OutputStream output, - AvroSchema schema) + ApacheCodecRecycler apacheCodecRecycler, + AvroSchema schema, + OutputStream output) throws JacksonException { super(writeCtxt, ioCtxt, streamWriteFeatures); _formatWriteFeatures = avroFeatures; _output = output; _streamWriteContext = AvroWriteContext.nullContext(); + _apacheCodecRecycler = apacheCodecRecycler; final boolean buffering = isEnabled(Feature.AVRO_BUFFERING); - BinaryEncoder encoderToReuse = ApacheCodecRecycler.acquireEncoder(); + BinaryEncoder encoderToReuse = apacheCodecRecycler.acquireEncoder(); _encoder = buffering ? ENCODER_FACTORY.binaryEncoder(output, encoderToReuse) : ENCODER_FACTORY.directBinaryEncoder(output, encoderToReuse); @@ -681,10 +688,15 @@ protected final void _verifyValueWrite(String typeMsg) throws JacksonException { @Override protected void _releaseBuffers() { // no super implementation to call - BinaryEncoder e = _encoder; - if (e != null) { - _encoder = null; - ApacheCodecRecycler.release(e); + ApacheCodecRecycler recycler = _apacheCodecRecycler; + if (recycler != null) { + _apacheCodecRecycler = null; + BinaryEncoder e = _encoder; + if (e != null) { + _encoder = null; + recycler.release(e); + } + recycler.releaseToPool(); } } diff --git a/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheAvroParserImpl.java b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheAvroParserImpl.java index cdc24d44b..5b66f461c 100644 --- a/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheAvroParserImpl.java +++ b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheAvroParserImpl.java @@ -18,11 +18,22 @@ */ public class ApacheAvroParserImpl extends AvroParserImpl { + /* + /********************************************************** + /* Configuration + /********************************************************** + */ + /** * @since 2.16 */ protected final static DecoderFactory DECODER_FACTORY = DecoderFactory.get(); + /** + * @since 2.16 + */ + protected ApacheCodecRecycler _apacheCodecRecycler; + /* /********************************************************************** /* Input source config @@ -69,9 +80,11 @@ public class ApacheAvroParserImpl extends AvroParserImpl /* Life-cycle /********************************************************************** */ - + public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt, - int parserFeatures, int avroFeatures, AvroSchema schema, + int parserFeatures, int avroFeatures, + ApacheCodecRecycler apacheCodecRecycler, + AvroSchema schema, InputStream in) { super(readCtxt, ioCtxt, parserFeatures, avroFeatures, schema); @@ -80,20 +93,25 @@ public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt, _inputPtr = 0; _inputEnd = 0; _bufferRecyclable = true; + _apacheCodecRecycler = apacheCodecRecycler; + final boolean buffering = Feature.AVRO_BUFFERING.enabledIn(avroFeatures); - BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder(); + BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder(); _decoder = buffering ? DECODER_FACTORY.binaryDecoder(in, decoderToReuse) : DECODER_FACTORY.directBinaryDecoder(in, decoderToReuse); } public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt, - int parserFeatures, int avroFeatures, AvroSchema schema, + int parserFeatures, int avroFeatures, + ApacheCodecRecycler apacheCodecRecycler, + AvroSchema schema, byte[] buffer, int offset, int len) { super(readCtxt, ioCtxt, parserFeatures, avroFeatures, schema); _inputStream = null; - BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder(); + _apacheCodecRecycler = apacheCodecRecycler; + BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder(); _decoder = DECODER_FACTORY.binaryDecoder(buffer, offset, len, decoderToReuse); } @@ -107,14 +125,18 @@ protected void _releaseBuffers() { _ioContext.releaseReadIOBuffer(buf); } } - BinaryDecoder d = _decoder; - if (d != null) { - _decoder = null; - ApacheCodecRecycler.release(d); + ApacheCodecRecycler recycler = _apacheCodecRecycler; + if (recycler != null) { + _apacheCodecRecycler = null; + BinaryDecoder d = _decoder; + if (d != null) { + _decoder = null; + recycler.release(d); + } + recycler.releaseToPool(); } } - /* /********************************************************************** /* Abstract method impls, i/o access diff --git a/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java index 5026e3f59..5c59a0547 100644 --- a/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java +++ b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java @@ -1,23 +1,26 @@ package tools.jackson.dataformat.avro.apacheimpl; -import java.lang.ref.SoftReference; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.apache.avro.io.*; +import tools.jackson.core.util.RecyclerPool; +import tools.jackson.core.util.RecyclerPool.WithPool; + /** * Simple helper class that contains extracted functionality for * simple encoder/decoder recycling. */ public final class ApacheCodecRecycler + implements WithPool { - protected final static ThreadLocal> _recycler - = new ThreadLocal>(); - private final AtomicReference decoderRef = new AtomicReference<>(); private final AtomicReference encoderRef = new AtomicReference<>(); - private ApacheCodecRecycler() { } + private RecyclerPool _pool; + + ApacheCodecRecycler() { } /* /********************************************************************** @@ -25,36 +28,59 @@ private ApacheCodecRecycler() { } /********************************************************************** */ - public static BinaryDecoder acquireDecoder() { - return _recycler().decoderRef.getAndSet(null); + public BinaryDecoder acquireDecoder() { + return decoderRef.getAndSet(null); } - public static BinaryEncoder acquireEncoder() { - return _recycler().encoderRef.getAndSet(null); + public BinaryEncoder acquireEncoder() { + return encoderRef.getAndSet(null); } - public static void release(BinaryDecoder dec) { - _recycler().decoderRef.set(dec); + public void release(BinaryDecoder dec) { + decoderRef.set(dec); } - public static void release(BinaryEncoder enc) { - _recycler().encoderRef.set(enc); + public void release(BinaryEncoder enc) { + encoderRef.set(enc); } /* - /********************************************************************** - /* Internal per-instance methods - /********************************************************************** + /********************************************************** + /* WithPool implementation + /********************************************************** */ - private static ApacheCodecRecycler _recycler() { - SoftReference ref = _recycler.get(); - ApacheCodecRecycler r = (ref == null) ? null : ref.get(); + /** + * Method called by owner of this recycler instance, to provide reference to + * {@link RecyclerPool} into which instance is to be released (if any) + * + * @since 2.16 + */ + @Override + public ApacheCodecRecycler withPool(RecyclerPool pool) { + if (this._pool != null) { + throw new IllegalStateException("ApacheCodecRecycler already linked to pool: "+pool); + } + // assign to pool to which this BufferRecycler belongs in order to release it + // to the same pool when the work will be completed + _pool = Objects.requireNonNull(pool); + return this; + } - if (r == null) { - r = new ApacheCodecRecycler(); - _recycler.set(new SoftReference<>(r)); + /** + * Method called when owner of this recycler no longer wishes use it; this should + * return it to pool passed via {@code withPool()} (if any). + * + * @since 2.16 + */ + @Override + public void releaseToPool() { + if (_pool != null) { + RecyclerPool tmpPool = _pool; + // nullify the reference to the pool in order to avoid the risk of releasing + // the same BufferRecycler more than once, thus compromising the pool integrity + _pool = null; + tmpPool.releasePooled(this); } - return r; } } diff --git a/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java new file mode 100644 index 000000000..fcc90d8b3 --- /dev/null +++ b/avro/src/main/java/tools/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java @@ -0,0 +1,70 @@ +package tools.jackson.dataformat.avro.apacheimpl; + +import java.lang.ref.SoftReference; + +import tools.jackson.core.util.BufferRecycler; +import tools.jackson.core.util.RecyclerPool; + +public final class AvroRecyclerPools +{ + /** + * @return the default {@link RecyclerPool} implementation + * which is the thread local based one: + * basically alias to {@link #threadLocalPool()}). + */ + public static RecyclerPool defaultPool() { + return threadLocalPool(); + } + + /** + * Accessor for getting the shared/global {@link ThreadLocalPool} instance + * (due to design only one instance ever needed) + * + * @return Globally shared instance of {@link ThreadLocalPool} + */ + public static RecyclerPool threadLocalPool() { + return ThreadLocalPool.GLOBAL; + } + + /* + /********************************************************************** + /* Concrete RecyclerPool implementations for recycling BufferRecyclers + /********************************************************************** + */ + + /** + * {@link ThreadLocal}-based {@link RecyclerPool} implementation used for + * recycling {@link BufferRecycler} instances: + * see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation + * of functioning. + */ + public static class ThreadLocalPool + extends RecyclerPool.ThreadLocalPoolBase + { + private static final long serialVersionUID = 1L; + + protected static final ThreadLocalPool GLOBAL = new ThreadLocalPool(); + + protected final static ThreadLocal> _recycler + = new ThreadLocal>(); + + private ThreadLocalPool() { } + + @Override + public ApacheCodecRecycler acquirePooled() { + SoftReference ref = _recycler.get(); + ApacheCodecRecycler r = (ref == null) ? null : ref.get(); + + if (r == null) { + r = new ApacheCodecRecycler(); + _recycler.set(new SoftReference<>(r)); + } + return r; + } + + // // // JDK serialization support + + protected Object readResolve() { return GLOBAL; } + } + +} diff --git a/release-notes/VERSION-2.x b/release-notes/VERSION-2.x index c6424ac58..bfeaf651c 100644 --- a/release-notes/VERSION-2.x +++ b/release-notes/VERSION-2.x @@ -18,7 +18,7 @@ Active maintainers: #403: Remove Smile-specific buffer-recycling -2.15.3 (not yet released) +2.15.3 (12-Oct-2023) #384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency (reported by Simon D)