Skip to content

Commit

Permalink
Enable auto-flush in ion binary writer. (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
linlin-s committed Dec 5, 2023
1 parent cb5b43f commit d3760a5
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 22 deletions.
1 change: 0 additions & 1 deletion src/main/java/com/amazon/ion/IonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public interface IonWriter
*/
public void flush() throws IOException;


/**
* Indicates that writing is completed and all buffered data should be
* written and flushed as if this were the end of the Ion data stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ _Private_IonBinaryWriterBuilder withInitialSymbolTable(SymbolTable symtab)
return b;
}

@Override
public _Private_IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled) {
_Private_IonBinaryWriterBuilder b = mutable();
b.setAutoFlushEnabled(autoFlushEnabled);
return b;
}

public void setAutoFlushEnabled(boolean autoFlushEnabled) {
mutationCheck();
if (autoFlushEnabled) {
myBinaryWriterBuilder.withAutoFlushEnabled();
myBinaryWriterBuilder.withLocalSymbolTableAppendEnabled();
} else {
myBinaryWriterBuilder.withAutoFlushDisabled();
}
}
@Override
public void setLocalSymbolTableAppendEnabled(boolean enabled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/** Wraps {@link IonRawBinaryWriter} with symbol table management. */
@SuppressWarnings("deprecation")
/*package*/ final class IonManagedBinaryWriter extends AbstractIonWriter implements _Private_IonManagedWriter
Expand Down Expand Up @@ -677,7 +676,9 @@ public void makeReadOnly()
StreamCloseMode.NO_CLOSE,
StreamFlushMode.NO_FLUSH,
builder.preallocationMode,
builder.isFloatBinary32Enabled
builder.isFloatBinary32Enabled,
false,
this::flush
);
this.user = new IonRawBinaryWriter(
builder.provider,
Expand All @@ -687,7 +688,9 @@ public void makeReadOnly()
StreamCloseMode.CLOSE,
StreamFlushMode.FLUSH,
builder.preallocationMode,
builder.isFloatBinary32Enabled
builder.isFloatBinary32Enabled,
builder.isAutoFlushEnabled,
this::flush
);

this.catalog = builder.catalog;
Expand Down Expand Up @@ -1101,11 +1104,10 @@ public void writeBytes(byte[] data, int off, int len) throws IOException

// Stream Terminators

public void flush() throws IOException
{
public void flush() throws IOException {
if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled))
{
unsafeFlush();
unsafeFlush();
}
}

Expand Down
35 changes: 30 additions & 5 deletions src/main/java/com/amazon/ion/impl/bin/IonRawBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ private static byte[] bytes(int... vals) {
NULLS[STRUCT.ordinal()] = (byte) 0xDF;
}
private static final byte NULL_NULL = NULLS[NULL.ordinal()];

private static final byte BOOL_FALSE = (byte) 0x10;
private static final byte BOOL_TRUE = (byte) 0x11;

Expand All @@ -124,6 +123,7 @@ private static byte[] bytes(int... vals) {

private static final byte VARINT_NEG_ZERO = (byte) 0xC0;


final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool
.getInstance()
.getOrCreate();
Expand Down Expand Up @@ -362,6 +362,18 @@ public PatchPoint clear() {
private boolean hasTopLevelSymbolTableAnnotation;

private boolean closed;
boolean autoFlushEnabled;
boolean flushAfterCurrentValue;
ThrowingRunnable autoFlush;

public void endOfBlockSizeReached() {
flushAfterCurrentValue = autoFlushEnabled;
}

@FunctionalInterface
interface ThrowingRunnable {
void run() throws IOException;
}

/*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider,
final int blockSize,
Expand All @@ -370,7 +382,9 @@ public PatchPoint clear() {
final StreamCloseMode streamCloseMode,
final StreamFlushMode streamFlushMode,
final PreallocationMode preallocationMode,
final boolean isFloatBinary32Enabled)
final boolean isFloatBinary32Enabled,
final boolean isAutoFlushEnabled,
ThrowingRunnable autoFlush)
throws IOException
{
super(optimization);
Expand All @@ -383,7 +397,7 @@ public PatchPoint clear() {
this.streamFlushMode = streamFlushMode;
this.preallocationMode = preallocationMode;
this.isFloatBinary32Enabled = isFloatBinary32Enabled;
this.buffer = new WriteBuffer(allocator);
this.buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached);
this.patchPoints = new _Private_RecyclingQueue<>(512, PatchPoint::new);
this.containers = new _Private_RecyclingStack<ContainerInfo>(
10,
Expand All @@ -401,8 +415,12 @@ public ContainerInfo newElement() {
this.currentAnnotationSids = new IntList();
this.hasTopLevelSymbolTableAnnotation = false;
this.closed = false;
this.autoFlushEnabled = isAutoFlushEnabled;
this.autoFlush = autoFlush;
}



/** Always returns {@link Symbols#systemSymbolTable()}. */
public SymbolTable getSymbolTable()
{
Expand All @@ -421,6 +439,10 @@ public void setFieldNameSymbol(final SymbolToken name)
setFieldNameSymbol(name.getSid());
}

WriteBuffer getCurrentBuffer() {
return buffer;
}

public void setFieldNameSymbol(int sid)
{
if (!isInStruct())
Expand Down Expand Up @@ -722,7 +744,7 @@ private void prepareValue()
}

/** Closes out annotations. */
private void finishValue()
private void finishValue() throws IOException
{
if (!containers.isEmpty() && containers.peek().type == ContainerType.ANNOTATION)
{
Expand All @@ -731,6 +753,10 @@ private void finishValue()
}
hasWrittenValuesSinceFinished = true;
hasWrittenValuesSinceConstructed = true;
if (this.flushAfterCurrentValue && depth == 0) {
autoFlush.run();
this.flushAfterCurrentValue = false;
}
}

// Container Manipulation
Expand Down Expand Up @@ -1365,7 +1391,6 @@ public void finish() throws IOException
{
throw new IllegalStateException("Cannot finish within container: " + containers);
}

if (patchPoints.isEmpty())
{
// nothing to patch--write 'em out!
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/amazon/ion/impl/bin/WriteBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
private final List<Block> blocks;
private Block current;
private int index;
private Runnable endOfBlockCallBack;

public WriteBuffer(final BlockAllocator allocator)

public WriteBuffer(final BlockAllocator allocator, Runnable endOfBlockCallBack)
{
this.allocator = allocator;
this.blocks = new ArrayList<Block>();
Expand All @@ -42,6 +44,7 @@ public WriteBuffer(final BlockAllocator allocator)

this.index = 0;
this.current = blocks.get(0);
this.endOfBlockCallBack = endOfBlockCallBack;
}

private void allocateNewBlock()
Expand Down Expand Up @@ -138,12 +141,12 @@ private void writeBytesSlow(final byte[] bytes, int off, int len)
if (index == blocks.size() - 1)
{
allocateNewBlock();
endOfBlockCallBack.run();
}
index++;
current = blocks.get(index);
}
}

}

/** Writes an array of bytes to the buffer expanding if necessary. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public static IonWriter newIonWriter(ByteArrayOutputStream baos) throws IOExcept
IonRawBinaryWriter.StreamCloseMode.CLOSE,
IonRawBinaryWriter.StreamFlushMode.FLUSH,
IonRawBinaryWriter.PreallocationMode.PREALLOCATE_0,
false // force floats to be encoded as binary64
false, // force floats to be encoded as binary64
false,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ BlockAllocatorProvider createAllocatorProvider()
/*package*/ volatile SymbolTable initialSymbolTable;
/*package*/ volatile boolean isLocalSymbolTableAppendEnabled;
/*package*/ volatile boolean isFloatBinary32Enabled;
volatile boolean isAutoFlushEnabled;

private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider provider)
{
Expand All @@ -91,6 +92,7 @@ private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider prov
this.optimization = WriteValueOptimization.NONE;
this.isLocalSymbolTableAppendEnabled = false;
this.isFloatBinary32Enabled = false;
this.isAutoFlushEnabled = false;
}

private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWriterBuilder other)
Expand All @@ -105,6 +107,7 @@ private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWr
this.initialSymbolTable = other.initialSymbolTable;
this.isLocalSymbolTableAppendEnabled = other.isLocalSymbolTableAppendEnabled;
this.isFloatBinary32Enabled = other.isFloatBinary32Enabled;
this.isAutoFlushEnabled = other.isAutoFlushEnabled;
}

public _Private_IonManagedBinaryWriterBuilder copy()
Expand Down Expand Up @@ -167,6 +170,18 @@ public _Private_IonManagedBinaryWriterBuilder withFlatImports(final List<SymbolT
return withImports(ImportedSymbolResolverMode.FLAT, tables);
}

public _Private_IonManagedBinaryWriterBuilder withAutoFlushEnabled()
{
this.isAutoFlushEnabled = true;
return this;
}

public _Private_IonManagedBinaryWriterBuilder withAutoFlushDisabled()
{
this.isAutoFlushEnabled = false;
return this;
}

/*package*/ _Private_IonManagedBinaryWriterBuilder withImports(final ImportedSymbolResolverMode mode, final List<SymbolTable> tables) {
imports = new ImportedSymbolContext(mode, tables);
return this;
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/amazon/ion/system/IonBinaryWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,22 @@ public IvmMinimizing getIvmMinimizing()
*/
public abstract SymbolTable getInitialSymbolTable();

/**
* Auto-flush enables automatic execution of flush operations during the write process. When auto-flush is enabled and a new block allocation becomes necessary while writing the top-level value, a new block will be allocated.
* The data writing process will then continue uninterrupted until the top-level value is complete. After completing the top-level value that crosses the block boundary, the flush operation will be executed.
* This feature optimizes performance of the writing of long data streams by reducing block allocations.
* Additionally, setting a larger block size can further tune performance when auto-flush is enabled.
* A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the
* block size of write buffer.
* Auto-flush is disabled by default and the default block size is 32K.
* @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not.
*/
public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled);

/**
* Declares the symbol table to use for encoded data.
* To avoid conflicts between different data streams, if the given instance
* is mutable, it will be copied when {@code build()} is called.
*
* @param symtab must be a local or system symbol table.
* May be null, in which case the initial symbol table is that of
* {@code $ion_1_0}.
Expand Down
Loading

0 comments on commit d3760a5

Please sign in to comment.