Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable auto-flush in ion binary writer. #651

Merged
merged 5 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/com/amazon/ion/IonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public interface IonWriter
*/
public void flush() throws IOException;


/**
* Indicates that writing is completed and all buffered data should be
* written and flushed as if this were the end of the Ion data stream.
Expand Down
16 changes: 16 additions & 0 deletions src/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ _Private_IonBinaryWriterBuilder withInitialSymbolTable(SymbolTable symtab)
return b;
}

@Override
public _Private_IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled) {
_Private_IonBinaryWriterBuilder b = mutable();
b.setAutoFlushEnabled(autoFlushEnabled);
return b;
}

public void setAutoFlushEnabled(boolean autoFlushEnabled) {
mutationCheck();
if (autoFlushEnabled) {
myBinaryWriterBuilder.withAutoFlushEnabled();
myBinaryWriterBuilder.withLocalSymbolTableAppendEnabled();
} else {
myBinaryWriterBuilder.withAutoFlushDisabled();
}
}
@Override
public void setLocalSymbolTableAppendEnabled(boolean enabled)
{
Expand Down
14 changes: 8 additions & 6 deletions src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/** Wraps {@link IonRawBinaryWriter} with symbol table management. */
@SuppressWarnings("deprecation")
/*package*/ final class IonManagedBinaryWriter extends AbstractIonWriter implements _Private_IonManagedWriter
Expand Down Expand Up @@ -677,7 +676,9 @@ public void makeReadOnly()
StreamCloseMode.NO_CLOSE,
StreamFlushMode.NO_FLUSH,
builder.preallocationMode,
builder.isFloatBinary32Enabled
builder.isFloatBinary32Enabled,
false,
this::flush
);
this.user = new IonRawBinaryWriter(
builder.provider,
Expand All @@ -687,7 +688,9 @@ public void makeReadOnly()
StreamCloseMode.CLOSE,
StreamFlushMode.FLUSH,
builder.preallocationMode,
builder.isFloatBinary32Enabled
builder.isFloatBinary32Enabled,
builder.isAutoFlushEnabled,
this::flush
);

this.catalog = builder.catalog;
Expand Down Expand Up @@ -1101,11 +1104,10 @@ public void writeBytes(byte[] data, int off, int len) throws IOException

// Stream Terminators

public void flush() throws IOException
{
public void flush() throws IOException {
if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled))
{
unsafeFlush();
unsafeFlush();
}
}

Expand Down
35 changes: 30 additions & 5 deletions src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ private static byte[] bytes(int... vals) {
NULLS[STRUCT.ordinal()] = (byte) 0xDF;
}
private static final byte NULL_NULL = NULLS[NULL.ordinal()];

private static final byte BOOL_FALSE = (byte) 0x10;
private static final byte BOOL_TRUE = (byte) 0x11;

Expand All @@ -123,6 +122,7 @@ private static byte[] bytes(int... vals) {

private static final byte VARINT_NEG_ZERO = (byte) 0xC0;


final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool
.getInstance()
.getOrCreate();
Expand Down Expand Up @@ -361,6 +361,18 @@ public PatchPoint clear() {
private boolean hasTopLevelSymbolTableAnnotation;

private boolean closed;
boolean autoFlushEnabled;
boolean flushAfterCurrentValue;
ThrowingRunnable autoFlush;

public void endOfBlockSizeReached() {
flushAfterCurrentValue = autoFlushEnabled;
}

@FunctionalInterface
interface ThrowingRunnable {
void run() throws IOException;
}

/*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider,
final int blockSize,
Expand All @@ -369,7 +381,9 @@ public PatchPoint clear() {
final StreamCloseMode streamCloseMode,
final StreamFlushMode streamFlushMode,
final PreallocationMode preallocationMode,
final boolean isFloatBinary32Enabled)
final boolean isFloatBinary32Enabled,
final boolean isAutoFlushEnabled,
ThrowingRunnable autoFlush)
throws IOException
{
super(optimization);
Expand All @@ -382,7 +396,7 @@ public PatchPoint clear() {
this.streamFlushMode = streamFlushMode;
this.preallocationMode = preallocationMode;
this.isFloatBinary32Enabled = isFloatBinary32Enabled;
this.buffer = new WriteBuffer(allocator);
this.buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached);
this.patchPoints = new _Private_RecyclingQueue<>(512, PatchPoint::new);
this.containers = new _Private_RecyclingStack<ContainerInfo>(
10,
Expand All @@ -400,8 +414,12 @@ public ContainerInfo newElement() {
this.currentAnnotationSids = new IntList();
this.hasTopLevelSymbolTableAnnotation = false;
this.closed = false;
this.autoFlushEnabled = isAutoFlushEnabled;
this.autoFlush = autoFlush;
}



/** Always returns {@link Symbols#systemSymbolTable()}. */
public SymbolTable getSymbolTable()
{
Expand All @@ -420,6 +438,10 @@ public void setFieldNameSymbol(final SymbolToken name)
setFieldNameSymbol(name.getSid());
}

WriteBuffer getCurrentBuffer() {
return buffer;
}

public void setFieldNameSymbol(int sid)
{
if (!isInStruct())
Expand Down Expand Up @@ -721,7 +743,7 @@ private void prepareValue()
}

/** Closes out annotations. */
private void finishValue()
private void finishValue() throws IOException
{
if (!containers.isEmpty() && containers.peek().type == ContainerType.ANNOTATION)
{
Expand All @@ -730,6 +752,10 @@ private void finishValue()
}
hasWrittenValuesSinceFinished = true;
hasWrittenValuesSinceConstructed = true;
if (this.flushAfterCurrentValue && depth == 0) {
autoFlush.run();
this.flushAfterCurrentValue = false;
}
}

// Container Manipulation
Expand Down Expand Up @@ -1364,7 +1390,6 @@ public void finish() throws IOException
{
throw new IllegalStateException("Cannot finish within container: " + containers);
}

if (patchPoints.isEmpty())
{
// nothing to patch--write 'em out!
Expand Down
7 changes: 5 additions & 2 deletions src/com/amazon/ion/impl/bin/WriteBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
private final List<Block> blocks;
private Block current;
private int index;
private Runnable endOfBlockCallBack;

public WriteBuffer(final BlockAllocator allocator)

public WriteBuffer(final BlockAllocator allocator, Runnable endOfBlockCallBack)
{
this.allocator = allocator;
this.blocks = new ArrayList<Block>();
Expand All @@ -41,6 +43,7 @@ public WriteBuffer(final BlockAllocator allocator)

this.index = 0;
this.current = blocks.get(0);
this.endOfBlockCallBack = endOfBlockCallBack;
}

private void allocateNewBlock()
Expand Down Expand Up @@ -137,12 +140,12 @@ private void writeBytesSlow(final byte[] bytes, int off, int len)
if (index == blocks.size() - 1)
{
allocateNewBlock();
endOfBlockCallBack.run();
}
index++;
current = blocks.get(index);
}
}

}

/** Writes an array of bytes to the buffer expanding if necessary. */
Expand Down
4 changes: 3 additions & 1 deletion src/com/amazon/ion/impl/bin/_PrivateIon_HashTrampoline.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public static IonWriter newIonWriter(ByteArrayOutputStream baos) throws IOExcept
IonRawBinaryWriter.StreamCloseMode.CLOSE,
IonRawBinaryWriter.StreamFlushMode.FLUSH,
IonRawBinaryWriter.PreallocationMode.PREALLOCATE_0,
false // force floats to be encoded as binary64
false, // force floats to be encoded as binary64
false,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ BlockAllocatorProvider createAllocatorProvider()
/*package*/ volatile SymbolTable initialSymbolTable;
/*package*/ volatile boolean isLocalSymbolTableAppendEnabled;
/*package*/ volatile boolean isFloatBinary32Enabled;
volatile boolean isAutoFlushEnabled;

private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider provider)
{
Expand All @@ -91,6 +92,7 @@ private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider prov
this.optimization = WriteValueOptimization.NONE;
this.isLocalSymbolTableAppendEnabled = false;
this.isFloatBinary32Enabled = false;
this.isAutoFlushEnabled = false;
}

private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWriterBuilder other)
Expand All @@ -105,6 +107,7 @@ private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWr
this.initialSymbolTable = other.initialSymbolTable;
this.isLocalSymbolTableAppendEnabled = other.isLocalSymbolTableAppendEnabled;
this.isFloatBinary32Enabled = other.isFloatBinary32Enabled;
this.isAutoFlushEnabled = other.isAutoFlushEnabled;
}

public _Private_IonManagedBinaryWriterBuilder copy()
Expand Down Expand Up @@ -167,6 +170,18 @@ public _Private_IonManagedBinaryWriterBuilder withFlatImports(final List<SymbolT
return withImports(ImportedSymbolResolverMode.FLAT, tables);
}

public _Private_IonManagedBinaryWriterBuilder withAutoFlushEnabled()
{
this.isAutoFlushEnabled = true;
return this;
}

public _Private_IonManagedBinaryWriterBuilder withAutoFlushDisabled()
{
this.isAutoFlushEnabled = false;
return this;
}

/*package*/ _Private_IonManagedBinaryWriterBuilder withImports(final ImportedSymbolResolverMode mode, final List<SymbolTable> tables) {
imports = new ImportedSymbolContext(mode, tables);
return this;
Expand Down
12 changes: 11 additions & 1 deletion src/com/amazon/ion/system/IonBinaryWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,22 @@ public IvmMinimizing getIvmMinimizing()
*/
public abstract SymbolTable getInitialSymbolTable();

/**
* Auto-flush enables automatic execution of flush operations during the write process. When auto-flush is enabled and a new block allocation becomes necessary while writing the top-level value, a new block will be allocated.
* The data writing process will then continue uninterrupted until the top-level value is complete. After completing the top-level value that crosses the block boundary, the flush operation will be executed.
* This feature optimizes performance of the writing of long data streams by reducing block allocations.
* Additionally, setting a larger block size can further tune performance when auto-flush is enabled.
* A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the
* block size of write buffer.
* Auto-flush disabled by default and the default block size is 32K.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Auto-flush disabled by default and the default block size is 32K.
* Auto-flush is disabled by default and the default block size is 32K.

* @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not.
*/
public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled);

/**
* Declares the symbol table to use for encoded data.
* To avoid conflicts between different data streams, if the given instance
* is mutable, it will be copied when {@code build()} is called.
*
* @param symtab must be a local or system symbol table.
* May be null, in which case the initial symbol table is that of
* {@code $ion_1_0}.
Expand Down
Loading
Loading