Skip to content

Commit

Permalink
Merge branch '2.16'
Browse files Browse the repository at this point in the history
  • Loading branch information
cowtowncoder committed Oct 15, 2023
2 parents 47767cd + 5bffe8b commit 1354430
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 43 deletions.
17 changes: 15 additions & 2 deletions avro/src/main/java/tools/jackson/dataformat/avro/AvroFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import java.io.*;

import tools.jackson.dataformat.avro.apacheimpl.AvroRecyclerPools;

import tools.jackson.core.*;
import tools.jackson.core.base.BinaryTSFactory;
import tools.jackson.core.io.IOContext;
import tools.jackson.core.util.RecyclerPool;
import tools.jackson.dataformat.avro.apacheimpl.ApacheAvroParserImpl;
import tools.jackson.dataformat.avro.apacheimpl.ApacheCodecRecycler;
import tools.jackson.dataformat.avro.deser.*;

/**
Expand Down Expand Up @@ -38,6 +42,12 @@ public class AvroFactory
/**********************************************************
*/

/**
* @since 2.16
*/
protected RecyclerPool<ApacheCodecRecycler> _avroRecyclerPool
= AvroRecyclerPools.defaultPool();

/**
* Flag that is set if Apache Avro lib's decoder is to be used for decoding;
* `false` to use Jackson native Avro decoder.
Expand Down Expand Up @@ -234,6 +244,7 @@ protected AvroParser _createParser(ObjectReadContext readCtxt, IOContext ioCtxt,
return new ApacheAvroParserImpl(readCtxt, ioCtxt,
readCtxt.getStreamReadFeatures(_streamReadFeatures),
readCtxt.getFormatReadFeatures(_formatReadFeatures),
_avroRecyclerPool.acquireAndLinkPooled(),
(AvroSchema) readCtxt.getSchema(),
in);
}
Expand All @@ -253,6 +264,7 @@ protected AvroParser _createParser(ObjectReadContext readCtxt, IOContext ioCtxt,
return new ApacheAvroParserImpl(readCtxt, ioCtxt,
readCtxt.getStreamReadFeatures(_streamReadFeatures),
readCtxt.getFormatReadFeatures(_formatReadFeatures),
_avroRecyclerPool.acquireAndLinkPooled(),
(AvroSchema) readCtxt.getSchema(),
data, offset, len);
}
Expand Down Expand Up @@ -286,7 +298,8 @@ protected JsonGenerator _createGenerator(ObjectWriteContext writeCtxt,
return new AvroGenerator(writeCtxt, ioCtxt,
writeCtxt.getStreamWriteFeatures(_streamWriteFeatures),
writeCtxt.getFormatWriteFeatures(_formatWriteFeatures),
out,
(AvroSchema) writeCtxt.getSchema());
_avroRecyclerPool.acquireAndLinkPooled(),
(AvroSchema) writeCtxt.getSchema(),
out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ private Feature(boolean defaultState) {
*/
protected final static EncoderFactory ENCODER_FACTORY = EncoderFactory.get();

/**
* @since 2.16
*/
protected ApacheCodecRecycler _apacheCodecRecycler;

/**
* Bit flag composed of bits that indicate which
* {@link AvroGenerator.Feature}s
Expand Down Expand Up @@ -150,16 +155,18 @@ private Feature(boolean defaultState) {

public AvroGenerator(ObjectWriteContext writeCtxt, IOContext ioCtxt,
int streamWriteFeatures, int avroFeatures,
OutputStream output,
AvroSchema schema)
ApacheCodecRecycler apacheCodecRecycler,
AvroSchema schema,
OutputStream output)
throws JacksonException
{
super(writeCtxt, ioCtxt, streamWriteFeatures);
_formatWriteFeatures = avroFeatures;
_output = output;
_streamWriteContext = AvroWriteContext.nullContext();
_apacheCodecRecycler = apacheCodecRecycler;
final boolean buffering = isEnabled(Feature.AVRO_BUFFERING);
BinaryEncoder encoderToReuse = ApacheCodecRecycler.acquireEncoder();
BinaryEncoder encoderToReuse = apacheCodecRecycler.acquireEncoder();
_encoder = buffering
? ENCODER_FACTORY.binaryEncoder(output, encoderToReuse)
: ENCODER_FACTORY.directBinaryEncoder(output, encoderToReuse);
Expand Down Expand Up @@ -681,10 +688,15 @@ protected final void _verifyValueWrite(String typeMsg) throws JacksonException {
@Override
protected void _releaseBuffers() {
// no super implementation to call
BinaryEncoder e = _encoder;
if (e != null) {
_encoder = null;
ApacheCodecRecycler.release(e);
ApacheCodecRecycler recycler = _apacheCodecRecycler;
if (recycler != null) {
_apacheCodecRecycler = null;
BinaryEncoder e = _encoder;
if (e != null) {
_encoder = null;
recycler.release(e);
}
recycler.releaseToPool();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@
*/
public class ApacheAvroParserImpl extends AvroParserImpl
{
/*
/**********************************************************
/* Configuration
/**********************************************************
*/

/**
* @since 2.16
*/
protected final static DecoderFactory DECODER_FACTORY = DecoderFactory.get();

/**
* @since 2.16
*/
protected ApacheCodecRecycler _apacheCodecRecycler;

/*
/**********************************************************************
/* Input source config
Expand Down Expand Up @@ -69,9 +80,11 @@ public class ApacheAvroParserImpl extends AvroParserImpl
/* Life-cycle
/**********************************************************************
*/

public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt,
int parserFeatures, int avroFeatures, AvroSchema schema,
int parserFeatures, int avroFeatures,
ApacheCodecRecycler apacheCodecRecycler,
AvroSchema schema,
InputStream in)
{
super(readCtxt, ioCtxt, parserFeatures, avroFeatures, schema);
Expand All @@ -80,20 +93,25 @@ public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt,
_inputPtr = 0;
_inputEnd = 0;
_bufferRecyclable = true;
_apacheCodecRecycler = apacheCodecRecycler;

final boolean buffering = Feature.AVRO_BUFFERING.enabledIn(avroFeatures);
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
_decoder = buffering
? DECODER_FACTORY.binaryDecoder(in, decoderToReuse)
: DECODER_FACTORY.directBinaryDecoder(in, decoderToReuse);
}

public ApacheAvroParserImpl(ObjectReadContext readCtxt, IOContext ioCtxt,
int parserFeatures, int avroFeatures, AvroSchema schema,
int parserFeatures, int avroFeatures,
ApacheCodecRecycler apacheCodecRecycler,
AvroSchema schema,
byte[] buffer, int offset, int len)
{
super(readCtxt, ioCtxt, parserFeatures, avroFeatures, schema);
_inputStream = null;
BinaryDecoder decoderToReuse = ApacheCodecRecycler.acquireDecoder();
_apacheCodecRecycler = apacheCodecRecycler;
BinaryDecoder decoderToReuse = apacheCodecRecycler.acquireDecoder();
_decoder = DECODER_FACTORY.binaryDecoder(buffer, offset, len, decoderToReuse);
}

Expand All @@ -107,14 +125,18 @@ protected void _releaseBuffers() {
_ioContext.releaseReadIOBuffer(buf);
}
}
BinaryDecoder d = _decoder;
if (d != null) {
_decoder = null;
ApacheCodecRecycler.release(d);
ApacheCodecRecycler recycler = _apacheCodecRecycler;
if (recycler != null) {
_apacheCodecRecycler = null;
BinaryDecoder d = _decoder;
if (d != null) {
_decoder = null;
recycler.release(d);
}
recycler.releaseToPool();
}
}


/*
/**********************************************************************
/* Abstract method impls, i/o access
Expand Down
Original file line number Diff line number Diff line change
@@ -1,60 +1,86 @@
package tools.jackson.dataformat.avro.apacheimpl;

import java.lang.ref.SoftReference;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.avro.io.*;

import tools.jackson.core.util.RecyclerPool;
import tools.jackson.core.util.RecyclerPool.WithPool;

/**
* Simple helper class that contains extracted functionality for
* simple encoder/decoder recycling.
*/
public final class ApacheCodecRecycler
implements WithPool<ApacheCodecRecycler>
{
protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();

private final AtomicReference<BinaryDecoder> decoderRef = new AtomicReference<>();
private final AtomicReference<BinaryEncoder> encoderRef = new AtomicReference<>();

private ApacheCodecRecycler() { }
private RecyclerPool<ApacheCodecRecycler> _pool;

ApacheCodecRecycler() { }

/*
/**********************************************************************
/* Public API
/**********************************************************************
*/

public static BinaryDecoder acquireDecoder() {
return _recycler().decoderRef.getAndSet(null);
public BinaryDecoder acquireDecoder() {
return decoderRef.getAndSet(null);
}

public static BinaryEncoder acquireEncoder() {
return _recycler().encoderRef.getAndSet(null);
public BinaryEncoder acquireEncoder() {
return encoderRef.getAndSet(null);
}

public static void release(BinaryDecoder dec) {
_recycler().decoderRef.set(dec);
public void release(BinaryDecoder dec) {
decoderRef.set(dec);
}

public static void release(BinaryEncoder enc) {
_recycler().encoderRef.set(enc);
public void release(BinaryEncoder enc) {
encoderRef.set(enc);
}

/*
/**********************************************************************
/* Internal per-instance methods
/**********************************************************************
/**********************************************************
/* WithPool implementation
/**********************************************************
*/

private static ApacheCodecRecycler _recycler() {
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
ApacheCodecRecycler r = (ref == null) ? null : ref.get();
/**
* Method called by owner of this recycler instance, to provide reference to
* {@link RecyclerPool} into which instance is to be released (if any)
*
* @since 2.16
*/
@Override
public ApacheCodecRecycler withPool(RecyclerPool<ApacheCodecRecycler> pool) {
if (this._pool != null) {
throw new IllegalStateException("ApacheCodecRecycler already linked to pool: "+pool);
}
// assign to pool to which this BufferRecycler belongs in order to release it
// to the same pool when the work will be completed
_pool = Objects.requireNonNull(pool);
return this;
}

if (r == null) {
r = new ApacheCodecRecycler();
_recycler.set(new SoftReference<>(r));
/**
* Method called when owner of this recycler no longer wishes use it; this should
* return it to pool passed via {@code withPool()} (if any).
*
* @since 2.16
*/
@Override
public void releaseToPool() {
if (_pool != null) {
RecyclerPool<ApacheCodecRecycler> tmpPool = _pool;
// nullify the reference to the pool in order to avoid the risk of releasing
// the same BufferRecycler more than once, thus compromising the pool integrity
_pool = null;
tmpPool.releasePooled(this);
}
return r;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package tools.jackson.dataformat.avro.apacheimpl;

import java.lang.ref.SoftReference;

import tools.jackson.core.util.BufferRecycler;
import tools.jackson.core.util.RecyclerPool;

public final class AvroRecyclerPools
{
/**
* @return the default {@link RecyclerPool} implementation
* which is the thread local based one:
* basically alias to {@link #threadLocalPool()}).
*/
public static RecyclerPool<ApacheCodecRecycler> defaultPool() {
return threadLocalPool();
}

/**
* Accessor for getting the shared/global {@link ThreadLocalPool} instance
* (due to design only one instance ever needed)
*
* @return Globally shared instance of {@link ThreadLocalPool}
*/
public static RecyclerPool<ApacheCodecRecycler> threadLocalPool() {
return ThreadLocalPool.GLOBAL;
}

/*
/**********************************************************************
/* Concrete RecyclerPool implementations for recycling BufferRecyclers
/**********************************************************************
*/

/**
* {@link ThreadLocal}-based {@link RecyclerPool} implementation used for
* recycling {@link BufferRecycler} instances:
* see {@link RecyclerPool.ThreadLocalPoolBase} for full explanation
* of functioning.
*/
public static class ThreadLocalPool
extends RecyclerPool.ThreadLocalPoolBase<ApacheCodecRecycler>
{
private static final long serialVersionUID = 1L;

protected static final ThreadLocalPool GLOBAL = new ThreadLocalPool();

protected final static ThreadLocal<SoftReference<ApacheCodecRecycler>> _recycler
= new ThreadLocal<SoftReference<ApacheCodecRecycler>>();

private ThreadLocalPool() { }

@Override
public ApacheCodecRecycler acquirePooled() {
SoftReference<ApacheCodecRecycler> ref = _recycler.get();
ApacheCodecRecycler r = (ref == null) ? null : ref.get();

if (r == null) {
r = new ApacheCodecRecycler();
_recycler.set(new SoftReference<>(r));
}
return r;
}

// // // JDK serialization support

protected Object readResolve() { return GLOBAL; }
}

}
2 changes: 1 addition & 1 deletion release-notes/VERSION-2.x
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Active maintainers:

#403: Remove Smile-specific buffer-recycling

2.15.3 (not yet released)
2.15.3 (12-Oct-2023)

#384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency
(reported by Simon D)
Expand Down

0 comments on commit 1354430

Please sign in to comment.