From 60640d604808c9fef906e16c1cca1b3d4efb85e7 Mon Sep 17 00:00:00 2001 From: Tatu Saloranta Date: Sat, 14 Oct 2023 20:59:32 -0700 Subject: [PATCH] Fix #400 (#406) --- .../avro/apacheimpl/ApacheCodecRecycler.java | 5 + .../avro/apacheimpl/AvroRecyclerPools.java | 208 +++++++++++++++++- release-notes/VERSION-2.x | 6 +- 3 files changed, 214 insertions(+), 5 deletions(-) diff --git a/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java b/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java index dadc7a5e5..de14ba122 100644 --- a/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java +++ b/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/ApacheCodecRecycler.java @@ -17,7 +17,12 @@ public final class ApacheCodecRecycler implements WithPool { + // NOTE: AtomicReference only needed for ThreadLocal recycling where + // single-thread access is not (ironically enough) ensured private final AtomicReference decoderRef = new AtomicReference<>(); + + // NOTE: AtomicReference only needed for ThreadLocal recycling where + // single-thread access is not (ironically enough) ensured private final AtomicReference encoderRef = new AtomicReference<>(); private RecyclerPool _pool; diff --git a/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java b/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java index 2d1722e00..302345d2d 100644 --- a/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java +++ b/avro/src/main/java/com/fasterxml/jackson/dataformat/avro/apacheimpl/AvroRecyclerPools.java @@ -1,9 +1,13 @@ package com.fasterxml.jackson.dataformat.avro.apacheimpl; import java.lang.ref.SoftReference; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; -import com.fasterxml.jackson.core.util.BufferRecycler; import com.fasterxml.jackson.core.util.RecyclerPool; +import com.fasterxml.jackson.core.util.RecyclerPool.BoundedPoolBase; +import com.fasterxml.jackson.core.util.RecyclerPool.ConcurrentDequePoolBase; +import com.fasterxml.jackson.core.util.RecyclerPool.LockFreePoolBase; public final class AvroRecyclerPools { @@ -26,6 +30,72 @@ public static RecyclerPool threadLocalPool() { return ThreadLocalPool.GLOBAL; } + /** + * Accessor for getting the shared/global {@link NonRecyclingPool} instance + * (due to design only one instance ever needed) + * + * @return Globally shared instance of {@link NonRecyclingPool}. + */ + public static RecyclerPool nonRecyclingPool() { + return NonRecyclingPool.GLOBAL; + } + + /** + * Accessor for getting the shared/global {@link ConcurrentDequePool} instance. + * + * @return Globally shared instance of {@link NonRecyclingPool}. + */ + public static RecyclerPool sharedConcurrentDequePool() { + return ConcurrentDequePool.GLOBAL; + } + + /** + * Accessor for constructing a new, non-shared {@link ConcurrentDequePool} instance. + * + * @return Globally shared instance of {@link NonRecyclingPool}. + */ + public static RecyclerPool newConcurrentDequePool() { + return ConcurrentDequePool.construct(); + } + + /** + * Accessor for getting the shared/global {@link LockFreePool} instance. + * + * @return Globally shared instance of {@link LockFreePool}. + */ + public static RecyclerPool sharedLockFreePool() { + return LockFreePool.GLOBAL; + } + + /** + * Accessor for constructing a new, non-shared {@link LockFreePool} instance. + * + * @return Globally shared instance of {@link LockFreePool}. + */ + public static RecyclerPool newLockFreePool() { + return LockFreePool.construct(); + } + + /** + * Accessor for getting the shared/global {@link BoundedPool} instance. + * + * @return Globally shared instance of {@link BoundedPool}. + */ + public static RecyclerPool sharedBoundedPool() { + return BoundedPool.GLOBAL; + } + + /** + * Accessor for constructing a new, non-shared {@link BoundedPool} instance. + * + * @param size Maximum number of values to pool + * + * @return Globally shared instance of {@link BoundedPool}. + */ + public static RecyclerPool newBoundedPool(int size) { + return BoundedPool.construct(size); + } + /* /********************************************************************** /* Concrete RecyclerPool implementations for recycling BufferRecyclers @@ -34,7 +104,7 @@ public static RecyclerPool threadLocalPool() { /** * {@link ThreadLocal}-based {@link RecyclerPool} implementation used for - * recycling {@link BufferRecycler} instances: + * recycling {@link ApacheCodecRecycler} instances: * see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation * of functioning. */ @@ -66,5 +136,137 @@ public ApacheCodecRecycler acquirePooled() { protected Object readResolve() { return GLOBAL; } } - + + /** + * Dummy {@link RecyclerPool} implementation that does not recycle + * anything but simply creates new instances when asked to acquire items. + */ + public static class NonRecyclingPool + extends RecyclerPool.NonRecyclingPoolBase + { + private static final long serialVersionUID = 1L; + + protected static final NonRecyclingPool GLOBAL = new NonRecyclingPool(); + + protected NonRecyclingPool() { } + + @Override + public ApacheCodecRecycler acquirePooled() { + return new ApacheCodecRecycler(); + } + + // // // JDK serialization support + + protected Object readResolve() { return GLOBAL; } + } + + /** + * {@link RecyclerPool} implementation that uses + * {@link ConcurrentLinkedDeque} for recycling instances. + *

+ * Pool is unbounded: see {@link RecyclerPool} what this means. + */ + public static class ConcurrentDequePool extends ConcurrentDequePoolBase + { + private static final long serialVersionUID = 1L; + + protected static final ConcurrentDequePool GLOBAL = new ConcurrentDequePool(SERIALIZATION_SHARED); + + // // // Life-cycle (constructors, factory methods) + + protected ConcurrentDequePool(int serialization) { + super(serialization); + } + + public static ConcurrentDequePool construct() { + return new ConcurrentDequePool(SERIALIZATION_NON_SHARED); + } + + @Override + public ApacheCodecRecycler createPooled() { + return new ApacheCodecRecycler(); + } + + // // // JDK serialization support + + // Make sure to re-link to global/shared or non-shared. + protected Object readResolve() { + return _resolveToShared(GLOBAL).orElseGet(() -> construct()); + } + } + + /** + * {@link RecyclerPool} implementation that uses + * a lock free linked list for recycling instances. + *

+ * Pool is unbounded: see {@link RecyclerPool} for + * details on what this means. + */ + public static class LockFreePool extends LockFreePoolBase + { + private static final long serialVersionUID = 1L; + + protected static final LockFreePool GLOBAL = new LockFreePool(SERIALIZATION_SHARED); + + // // // Life-cycle (constructors, factory methods) + + protected LockFreePool(int serialization) { + super(serialization); + } + + public static LockFreePool construct() { + return new LockFreePool(SERIALIZATION_NON_SHARED); + } + + @Override + public ApacheCodecRecycler createPooled() { + return new ApacheCodecRecycler(); + } + + // // // JDK serialization support + + // Make sure to re-link to global/shared or non-shared. + protected Object readResolve() { + return _resolveToShared(GLOBAL).orElseGet(() -> construct()); + } + } + + /** + * {@link RecyclerPool} implementation that uses + * a bounded queue ({@link ArrayBlockingQueue} for recycling instances. + * This is "bounded" pool since it will never hold on to more + * {@link ApacheCodecRecycler} instances than its size configuration: + * the default size is {@link BoundedPoolBase#DEFAULT_CAPACITY}. + */ + public static class BoundedPool extends BoundedPoolBase + { + private static final long serialVersionUID = 1L; + + protected static final BoundedPool GLOBAL = new BoundedPool(SERIALIZATION_SHARED); + + // // // Life-cycle (constructors, factory methods) + + protected BoundedPool(int capacityAsId) { + super(capacityAsId); + } + + public static BoundedPool construct(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("capacity must be > 0, was: "+capacity); + } + return new BoundedPool(capacity); + } + + @Override + public ApacheCodecRecycler createPooled() { + return new ApacheCodecRecycler(); + } + + // // // JDK serialization support + + // Make sure to re-link to global/shared or non-shared. + protected Object readResolve() { + return _resolveToShared(GLOBAL).orElseGet(() -> construct(_serialization)); + } + } } diff --git a/release-notes/VERSION-2.x b/release-notes/VERSION-2.x index bfeaf651c..e2749f74d 100644 --- a/release-notes/VERSION-2.x +++ b/release-notes/VERSION-2.x @@ -16,11 +16,13 @@ Active maintainers: 2.16.0 (not yet released) -#403: Remove Smile-specific buffer-recycling +#400: (avro) Rewrite Avro buffer recycling (`ApacheCodecRecycler.java`) to + use new `RecyclerPool`, allow configuring use of non-ThreadLocal based pools +#403: (smile) Remove Smile-specific buffer-recycling 2.15.3 (12-Oct-2023) -#384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency +#384: (smile) `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency (reported by Simon D) 2.15.2 (30-May-2023)