From d3760a55ed724aa63c9255ebb9247c68bcb1e5da Mon Sep 17 00:00:00 2001 From: linlin-s Date: Tue, 5 Dec 2023 14:02:28 -0800 Subject: [PATCH] Enable auto-flush in ion binary writer. (#651) --- src/main/java/com/amazon/ion/IonWriter.java | 1 - .../impl/_Private_IonBinaryWriterBuilder.java | 16 ++ .../ion/impl/bin/IonManagedBinaryWriter.java | 14 +- .../ion/impl/bin/IonRawBinaryWriter.java | 35 ++- .../com/amazon/ion/impl/bin/WriteBuffer.java | 7 +- .../impl/bin/_PrivateIon_HashTrampoline.java | 4 +- ...Private_IonManagedBinaryWriterBuilder.java | 15 ++ .../ion/system/IonBinaryWriterBuilder.java | 12 +- .../impl/bin/IonManagedBinaryWriterTest.java | 224 ++++++++++++++++++ .../bin/IonManagedBinaryWriterTestCase.java | 21 ++ .../ion/impl/bin/IonRawBinaryWriterTest.java | 4 +- .../amazon/ion/impl/bin/WriteBufferTest.java | 27 ++- 12 files changed, 358 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/amazon/ion/IonWriter.java b/src/main/java/com/amazon/ion/IonWriter.java index 2f0f100247..4efc70e0c4 100644 --- a/src/main/java/com/amazon/ion/IonWriter.java +++ b/src/main/java/com/amazon/ion/IonWriter.java @@ -126,7 +126,6 @@ public interface IonWriter */ public void flush() throws IOException; - /** * Indicates that writing is completed and all buffered data should be * written and flushed as if this were the end of the Ion data stream. diff --git a/src/main/java/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java b/src/main/java/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java index 84e2f244b7..2b008b9349 100644 --- a/src/main/java/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java +++ b/src/main/java/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java @@ -193,6 +193,22 @@ _Private_IonBinaryWriterBuilder withInitialSymbolTable(SymbolTable symtab) return b; } + @Override + public _Private_IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled) { + _Private_IonBinaryWriterBuilder b = mutable(); + b.setAutoFlushEnabled(autoFlushEnabled); + return b; + } + + public void setAutoFlushEnabled(boolean autoFlushEnabled) { + mutationCheck(); + if (autoFlushEnabled) { + myBinaryWriterBuilder.withAutoFlushEnabled(); + myBinaryWriterBuilder.withLocalSymbolTableAppendEnabled(); + } else { + myBinaryWriterBuilder.withAutoFlushDisabled(); + } + } @Override public void setLocalSymbolTableAppendEnabled(boolean enabled) { diff --git a/src/main/java/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java b/src/main/java/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java index 75ca54c574..61f5cbc170 100644 --- a/src/main/java/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java +++ b/src/main/java/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java @@ -51,7 +51,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; - /** Wraps {@link IonRawBinaryWriter} with symbol table management. */ @SuppressWarnings("deprecation") /*package*/ final class IonManagedBinaryWriter extends AbstractIonWriter implements _Private_IonManagedWriter @@ -677,7 +676,9 @@ public void makeReadOnly() StreamCloseMode.NO_CLOSE, StreamFlushMode.NO_FLUSH, builder.preallocationMode, - builder.isFloatBinary32Enabled + builder.isFloatBinary32Enabled, + false, + this::flush ); this.user = new IonRawBinaryWriter( builder.provider, @@ -687,7 +688,9 @@ public void makeReadOnly() StreamCloseMode.CLOSE, StreamFlushMode.FLUSH, builder.preallocationMode, - builder.isFloatBinary32Enabled + builder.isFloatBinary32Enabled, + builder.isAutoFlushEnabled, + this::flush ); this.catalog = builder.catalog; @@ -1101,11 +1104,10 @@ public void writeBytes(byte[] data, int off, int len) throws IOException // Stream Terminators - public void flush() throws IOException - { + public void flush() throws IOException { if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled)) { - unsafeFlush(); + unsafeFlush(); } } diff --git a/src/main/java/com/amazon/ion/impl/bin/IonRawBinaryWriter.java b/src/main/java/com/amazon/ion/impl/bin/IonRawBinaryWriter.java index 31f06b3b2c..315d4bf204 100644 --- a/src/main/java/com/amazon/ion/impl/bin/IonRawBinaryWriter.java +++ b/src/main/java/com/amazon/ion/impl/bin/IonRawBinaryWriter.java @@ -98,7 +98,6 @@ private static byte[] bytes(int... vals) { NULLS[STRUCT.ordinal()] = (byte) 0xDF; } private static final byte NULL_NULL = NULLS[NULL.ordinal()]; - private static final byte BOOL_FALSE = (byte) 0x10; private static final byte BOOL_TRUE = (byte) 0x11; @@ -124,6 +123,7 @@ private static byte[] bytes(int... vals) { private static final byte VARINT_NEG_ZERO = (byte) 0xC0; + final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool .getInstance() .getOrCreate(); @@ -362,6 +362,18 @@ public PatchPoint clear() { private boolean hasTopLevelSymbolTableAnnotation; private boolean closed; + boolean autoFlushEnabled; + boolean flushAfterCurrentValue; + ThrowingRunnable autoFlush; + + public void endOfBlockSizeReached() { + flushAfterCurrentValue = autoFlushEnabled; + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws IOException; + } /*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider, final int blockSize, @@ -370,7 +382,9 @@ public PatchPoint clear() { final StreamCloseMode streamCloseMode, final StreamFlushMode streamFlushMode, final PreallocationMode preallocationMode, - final boolean isFloatBinary32Enabled) + final boolean isFloatBinary32Enabled, + final boolean isAutoFlushEnabled, + ThrowingRunnable autoFlush) throws IOException { super(optimization); @@ -383,7 +397,7 @@ public PatchPoint clear() { this.streamFlushMode = streamFlushMode; this.preallocationMode = preallocationMode; this.isFloatBinary32Enabled = isFloatBinary32Enabled; - this.buffer = new WriteBuffer(allocator); + this.buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached); this.patchPoints = new _Private_RecyclingQueue<>(512, PatchPoint::new); this.containers = new _Private_RecyclingStack( 10, @@ -401,8 +415,12 @@ public ContainerInfo newElement() { this.currentAnnotationSids = new IntList(); this.hasTopLevelSymbolTableAnnotation = false; this.closed = false; + this.autoFlushEnabled = isAutoFlushEnabled; + this.autoFlush = autoFlush; } + + /** Always returns {@link Symbols#systemSymbolTable()}. */ public SymbolTable getSymbolTable() { @@ -421,6 +439,10 @@ public void setFieldNameSymbol(final SymbolToken name) setFieldNameSymbol(name.getSid()); } + WriteBuffer getCurrentBuffer() { + return buffer; + } + public void setFieldNameSymbol(int sid) { if (!isInStruct()) @@ -722,7 +744,7 @@ private void prepareValue() } /** Closes out annotations. */ - private void finishValue() + private void finishValue() throws IOException { if (!containers.isEmpty() && containers.peek().type == ContainerType.ANNOTATION) { @@ -731,6 +753,10 @@ private void finishValue() } hasWrittenValuesSinceFinished = true; hasWrittenValuesSinceConstructed = true; + if (this.flushAfterCurrentValue && depth == 0) { + autoFlush.run(); + this.flushAfterCurrentValue = false; + } } // Container Manipulation @@ -1365,7 +1391,6 @@ public void finish() throws IOException { throw new IllegalStateException("Cannot finish within container: " + containers); } - if (patchPoints.isEmpty()) { // nothing to patch--write 'em out! diff --git a/src/main/java/com/amazon/ion/impl/bin/WriteBuffer.java b/src/main/java/com/amazon/ion/impl/bin/WriteBuffer.java index d8bf4162fc..2955147be7 100644 --- a/src/main/java/com/amazon/ion/impl/bin/WriteBuffer.java +++ b/src/main/java/com/amazon/ion/impl/bin/WriteBuffer.java @@ -31,8 +31,10 @@ private final List blocks; private Block current; private int index; + private Runnable endOfBlockCallBack; - public WriteBuffer(final BlockAllocator allocator) + + public WriteBuffer(final BlockAllocator allocator, Runnable endOfBlockCallBack) { this.allocator = allocator; this.blocks = new ArrayList(); @@ -42,6 +44,7 @@ public WriteBuffer(final BlockAllocator allocator) this.index = 0; this.current = blocks.get(0); + this.endOfBlockCallBack = endOfBlockCallBack; } private void allocateNewBlock() @@ -138,12 +141,12 @@ private void writeBytesSlow(final byte[] bytes, int off, int len) if (index == blocks.size() - 1) { allocateNewBlock(); + endOfBlockCallBack.run(); } index++; current = blocks.get(index); } } - } /** Writes an array of bytes to the buffer expanding if necessary. */ diff --git a/src/main/java/com/amazon/ion/impl/bin/_PrivateIon_HashTrampoline.java b/src/main/java/com/amazon/ion/impl/bin/_PrivateIon_HashTrampoline.java index 6d3537c1f6..197160b35e 100644 --- a/src/main/java/com/amazon/ion/impl/bin/_PrivateIon_HashTrampoline.java +++ b/src/main/java/com/amazon/ion/impl/bin/_PrivateIon_HashTrampoline.java @@ -40,7 +40,9 @@ public static IonWriter newIonWriter(ByteArrayOutputStream baos) throws IOExcept IonRawBinaryWriter.StreamCloseMode.CLOSE, IonRawBinaryWriter.StreamFlushMode.FLUSH, IonRawBinaryWriter.PreallocationMode.PREALLOCATE_0, - false // force floats to be encoded as binary64 + false, // force floats to be encoded as binary64 + false, + null ); } } diff --git a/src/main/java/com/amazon/ion/impl/bin/_Private_IonManagedBinaryWriterBuilder.java b/src/main/java/com/amazon/ion/impl/bin/_Private_IonManagedBinaryWriterBuilder.java index 35d87e118a..12d4c2b460 100644 --- a/src/main/java/com/amazon/ion/impl/bin/_Private_IonManagedBinaryWriterBuilder.java +++ b/src/main/java/com/amazon/ion/impl/bin/_Private_IonManagedBinaryWriterBuilder.java @@ -79,6 +79,7 @@ BlockAllocatorProvider createAllocatorProvider() /*package*/ volatile SymbolTable initialSymbolTable; /*package*/ volatile boolean isLocalSymbolTableAppendEnabled; /*package*/ volatile boolean isFloatBinary32Enabled; + volatile boolean isAutoFlushEnabled; private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider provider) { @@ -91,6 +92,7 @@ private _Private_IonManagedBinaryWriterBuilder(final BlockAllocatorProvider prov this.optimization = WriteValueOptimization.NONE; this.isLocalSymbolTableAppendEnabled = false; this.isFloatBinary32Enabled = false; + this.isAutoFlushEnabled = false; } private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWriterBuilder other) @@ -105,6 +107,7 @@ private _Private_IonManagedBinaryWriterBuilder(final _Private_IonManagedBinaryWr this.initialSymbolTable = other.initialSymbolTable; this.isLocalSymbolTableAppendEnabled = other.isLocalSymbolTableAppendEnabled; this.isFloatBinary32Enabled = other.isFloatBinary32Enabled; + this.isAutoFlushEnabled = other.isAutoFlushEnabled; } public _Private_IonManagedBinaryWriterBuilder copy() @@ -167,6 +170,18 @@ public _Private_IonManagedBinaryWriterBuilder withFlatImports(final List tables) { imports = new ImportedSymbolContext(mode, tables); return this; diff --git a/src/main/java/com/amazon/ion/system/IonBinaryWriterBuilder.java b/src/main/java/com/amazon/ion/system/IonBinaryWriterBuilder.java index 9c460c46ac..a4d9f85e0d 100644 --- a/src/main/java/com/amazon/ion/system/IonBinaryWriterBuilder.java +++ b/src/main/java/com/amazon/ion/system/IonBinaryWriterBuilder.java @@ -139,12 +139,22 @@ public IvmMinimizing getIvmMinimizing() */ public abstract SymbolTable getInitialSymbolTable(); + /** + * Auto-flush enables automatic execution of flush operations during the write process. When auto-flush is enabled and a new block allocation becomes necessary while writing the top-level value, a new block will be allocated. + * The data writing process will then continue uninterrupted until the top-level value is complete. After completing the top-level value that crosses the block boundary, the flush operation will be executed. + * This feature optimizes performance of the writing of long data streams by reducing block allocations. + * Additionally, setting a larger block size can further tune performance when auto-flush is enabled. + * A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the + * block size of write buffer. + * Auto-flush is disabled by default and the default block size is 32K. + * @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not. + */ + public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled); /** * Declares the symbol table to use for encoded data. * To avoid conflicts between different data streams, if the given instance * is mutable, it will be copied when {@code build()} is called. - * * @param symtab must be a local or system symbol table. * May be null, in which case the initial symbol table is that of * {@code $ion_1_0}. diff --git a/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java b/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java index c808a8458a..bfa1e196d2 100644 --- a/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java +++ b/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java @@ -17,6 +17,7 @@ import com.amazon.ion.IonDatagram; import com.amazon.ion.IonInt; +import com.amazon.ion.IonLoader; import com.amazon.ion.IonReader; import com.amazon.ion.IonStruct; import com.amazon.ion.IonSymbol; @@ -24,12 +25,140 @@ import com.amazon.ion.IonWriter; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import com.amazon.ion.system.IonBinaryWriterBuilder; +import com.amazon.ion.system.IonSystemBuilder; +import org.junit.Before; import org.junit.Test; @SuppressWarnings("deprecation") public class IonManagedBinaryWriterTest extends IonManagedBinaryWriterTestCase { + private ByteArrayOutputStream source = new ByteArrayOutputStream(); + private ByteArrayOutputStream expectedOut = new ByteArrayOutputStream(); + private ByteArrayOutputStream source_32K = new ByteArrayOutputStream(); + private ByteArrayOutputStream expectedOut_32K = new ByteArrayOutputStream(); + private ByteArrayOutputStream source_67K = new ByteArrayOutputStream(); + private ByteArrayOutputStream expectedOut_67K = new ByteArrayOutputStream(); + private ByteArrayOutputStream singleTopLevelValue_13B = new ByteArrayOutputStream(); + + /** + * In test data, the field names are generated by appending a continuously increasing integer to the string "taco", resulting in names like "taco0", "taco1", and so on. + * These field names are paired with symbol IDs and then stored in then symbol table during the writing process. + * The actual bytes written into the user buffer are these symbol IDs. The symbol IDs start from $10, representing "taco0", and occupy one byte in the buffer. This single-byte representation continues up to "taco117", which corresponds to symbol ID $127. + * Beyond "taco117", encoding symbol IDs require two bytes. + * Therefore, for the first 118 ion structs, each takes 10 bytes, and for ion struct from "taco118" to "taco2988", each occupies 11 bytes. + * Consequently, the total buffer size is calculated as 118 × 10 + (2988 − 118 + 1) × 11 = 32761 bytes, then we keep writing 2990th ion struct which is 11 bytes, which will overflow the block limit of 32768 bytes. + * After completing the 2990th ion struct, the flush operation will be executed if auto-flush enabled. + */ + private int FLUSH_PERIOD = 2990; + @Before + public void generateTestData() throws IOException { + // Writing test data with continuous extending symbol table. After completing writing 3300th value, the local symbol table will stop growing. + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source); + int i = 0; + while (i < 3300) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + i); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + while (i >= 3300 && i < 3400) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + 3); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + defaultWriter.close(); + IonReader reader = system().newReader(source.toByteArray()); + // While writing the 2990th data structure, the cumulative size of the written data will exceed the current block size. + // If auto-flush enabled, the flush() operation will be executed after completing the 2990th struct. The output should be the same as manually flush after writing the 2990th data. + int index = 0; + + IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut); + while (reader.next() != null) { + expectedWriter.writeValue(reader); + index++; + if (index == FLUSH_PERIOD) { + expectedWriter.flush(); + index = 0; + } + } + expectedWriter.finish(); + } + + @Before + public void generateTestData32K() throws IOException { + // Writing test data with continuous extending symbol table. The total data written in user's block is 32K. + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K); + int i = 0; + // The total size of first 2989 ion struct will be 118 x 10 + (2988 - 118 + 1) x 11 = 32761 bytes. + while (i < 2989) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + i); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + // Keep writing 7 bytes to user's buffer until the total data size in the current block is 32768 bytes. + defaultWriter.writeString("taco__"); + defaultWriter.close(); + IonReader reader = system().newReader(source_32K.toByteArray()); + // No flush should be expected since the total data size written into user's buffer can fit into one block. + IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_32K); + while (reader.next() != null) { + expectedWriter.writeValue(reader); + } + expectedWriter.close(); + } + @Before + public void generateTestData67K() throws IOException { + // Writing test data with continuous extending symbol table. After completing writing 3200th value, the local symbol table will stop growing. + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_67K); + int i = 0; + // The total size of first 3200 ion structs is 118 x 10 + (3199 - 118 + 1) x 11 = 35082 bytes. + while (i < 3200) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + i); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + // Since we already have symbol token appended in the symbol table for field name "taco3", the size of ion struct after number 3200 to 6399 will be the same, which is 10 bytes. + // The total size of this part is (6399 - 3200 + 1) x 10 = 31990 bytes. The size of test data would be 35082 + 31990 = 67062 bytes. + while (i >= 3200 && i < 6400) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + 3); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + defaultWriter.close(); + IonReader reader = system().newReader(source_67K.toByteArray()); + // After writing the 2990th data, the cumulative size of the written data in user's buffer will exceed the current block size. + // If auto-flush enabled, the flush() operation will be executed after completing the 2990th data. The output should be the same as manually flush after writing the 2990th data. + int index = 0; + IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_67K); + while (reader.next() != null) { + expectedWriter.writeValue(reader); + index++; + // There is only one flush operation should be expected. After the first flush operation, there will be two blocks allocated in the write buffer which is big enough to fit the rest of data streams. + if (index == FLUSH_PERIOD) { + expectedWriter.flush(); + } + } + expectedWriter.close(); + } + + @Before + public void generateSingleTopLevelValue() throws IOException { + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(singleTopLevelValue_13B); + defaultWriter.writeString("taco_burrito"); // Write a 13-byte IonString. + defaultWriter.close(); + } @Test public void testSetStringAnnotations() throws Exception @@ -104,6 +233,89 @@ public void testFlushImmediatelyAfterIVM() throws Exception assertEquals("burrito", ((IonSymbol) dg.systemGet(2)).stringValue()); } + @Test + public void testAutoFlush() throws Exception{ + IonReader reader = system().newReader(source.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); + } + // When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation. + assertEquivalentDataModel(actual, source); + } + + @Test + public void testAutoFlush_32K() throws Exception{ + IonReader reader = system().newReader(source_32K.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); + } + // When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation. + assertEquivalentDataModel(actual, source_32K); + } + + @Test + public void testAutoFlush_67K() throws Exception{ + IonReader reader = system().newReader(source_67K.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); + } + assertEquivalentDataModel(actual, source_67K); + } + + @Test + public void testAutoFlush_twiceBlockSize() throws IOException { + IonReader reader = system().newReader(singleTopLevelValue_13B.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + // Set the actual writer block size as 5 bytes. The test data is a 13-byte IonString "taco_burrito". + IonBinaryWriterBuilder builder = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).withBlockSize(5); + IonWriter actualWriter = builder.build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + // When auto-flush is enabled, no flush is expected since this is a single top-level value and should continue encoding until this value is completed. + assertArrayEquals(actual.toByteArray(), singleTopLevelValue_13B.toByteArray()); + } + assertEquivalentDataModel(actual, singleTopLevelValue_13B); + } + + @Test + public void testAutoFlush_oneBlockSize() throws IOException { + IonReader reader = system().newReader(singleTopLevelValue_13B.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + // Set the actual writer block size as 13 bytes. The test data is a 13-byte IonString "taco_burrito". + IonBinaryWriterBuilder builder = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).withBlockSize(13); + IonWriter actualWriter = builder.build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + // When auto-flush is enabled, no flush is expected since this is a single top-level value and should continue encoding until this value is completed. + assertArrayEquals(actual.toByteArray(), singleTopLevelValue_13B.toByteArray()); + } + assertEquivalentDataModel(actual, singleTopLevelValue_13B); + } + @Test public void testNoNewSymbolsAfterFlush() throws Exception { @@ -253,4 +465,16 @@ public void testNestedEmptyAnnotatedContainer() throws Exception writer.stepOut(); assertValue("{bar: foo::[]}"); } + + /** + * Asserts equivalence of ion data model between two provided data streams. + * @param actual represents the serialized data streams when auto-flush is enabled. + * @param expected represents the expected data streams. + */ + private void assertEquivalentDataModel(ByteArrayOutputStream actual, ByteArrayOutputStream expected) { + IonLoader loader = IonSystemBuilder.standard().build().newLoader(); + IonDatagram actualDatagram = loader.load(actual.toByteArray()); + IonDatagram expectedDatagram = loader.load(expected.toByteArray()); + assertEquals(expectedDatagram, actualDatagram); + } } diff --git a/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTestCase.java b/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTestCase.java index fcc28498a8..78ab82ca74 100644 --- a/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTestCase.java +++ b/src/test/java/com/amazon/ion/impl/bin/IonManagedBinaryWriterTestCase.java @@ -64,14 +64,29 @@ protected enum LSTAppendMode public boolean isEnabled() { return this == LST_APPEND_ENABLED; } } + protected enum AutoFlushMode + { + AUTO_FLUSH_ENABLED, + AUTO_FLUSH_DISABLED; + public boolean isEnabled() { return this == AUTO_FLUSH_ENABLED; } + } + @Injected.Inject("lstAppendMode") public static final LSTAppendMode[] LST_APPEND_ENABLED_DIMENSIONS = LSTAppendMode.values(); protected LSTAppendMode lstAppendMode; + @Injected.Inject("autoFlushMode") + public static final AutoFlushMode[] AUTO_FLUSH_MODES = AutoFlushMode.values(); + protected AutoFlushMode autoFlushMode; public void setLstAppendMode(final LSTAppendMode mode) { this.lstAppendMode = mode; } + public void setAutoFlushMode(final AutoFlushMode mode) + { + this.autoFlushMode = mode; + } + private void checkSymbolTokenAgainstImport(final SymbolToken token) { final Integer sid = SHARED_SYMBOL_LOCAL_SIDS.get(token.getText()); @@ -142,6 +157,12 @@ protected IonWriter createWriter(final OutputStream out) throws IOException builder.withLocalSymbolTableAppendDisabled(); } + if (autoFlushMode.isEnabled()) { + builder.withAutoFlushEnabled(); + } else { + builder.withAutoFlushDisabled(); + } + final IonWriter writer = builder.newWriter(out); final SymbolTable locals = writer.getSymbolTable(); diff --git a/src/test/java/com/amazon/ion/impl/bin/IonRawBinaryWriterTest.java b/src/test/java/com/amazon/ion/impl/bin/IonRawBinaryWriterTest.java index ddf49e9969..f13a61d520 100644 --- a/src/test/java/com/amazon/ion/impl/bin/IonRawBinaryWriterTest.java +++ b/src/test/java/com/amazon/ion/impl/bin/IonRawBinaryWriterTest.java @@ -123,7 +123,9 @@ protected IonWriter createWriter(final OutputStream out) throws IOException StreamCloseMode.NO_CLOSE, StreamFlushMode.NO_FLUSH, preallocationMode, - true + true, + false, + null ); } diff --git a/src/test/java/com/amazon/ion/impl/bin/WriteBufferTest.java b/src/test/java/com/amazon/ion/impl/bin/WriteBufferTest.java index 1a8e83b80f..e71c4fb7c4 100644 --- a/src/test/java/com/amazon/ion/impl/bin/WriteBufferTest.java +++ b/src/test/java/com/amazon/ion/impl/bin/WriteBufferTest.java @@ -20,12 +20,17 @@ import static com.amazon.ion.impl.bin.WriteBuffer.writeVarUIntTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -37,15 +42,14 @@ public class WriteBufferTest { // XXX make this a prime to make it more likely that we collide on the edges of the buffer private static BlockAllocator ALLOCATOR = BlockAllocatorProviders.basicProvider().vendAllocator(11); - - private WriteBuffer buf; - private ByteArrayOutputStream out; + private WriteBuffer buf; + private AtomicBoolean endOfBufferReached = new AtomicBoolean(false); @BeforeEach - public void setup() + public void setup() throws IOException { - buf = new WriteBuffer(ALLOCATOR); + buf = new WriteBuffer(ALLOCATOR, () -> endOfBufferReached.set(true)); out = new ByteArrayOutputStream(); } @@ -260,6 +264,19 @@ public void testInt56Negative() assertBuffer(bytes); } + /** + * Test if the method endOfBufferReached is invoked appropriately, the size of bytes written into the buffer overflow the current block size. + * @throws UnsupportedEncodingException + */ + @Test + public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException { + buf.writeBytes("taco".getBytes("UTF-8")); + buf.writeBytes("burrito".getBytes("UTF-8")); + assertFalse(endOfBufferReached.get()); + buf.writeBytes("_".getBytes("UTF-8")); + assertTrue(endOfBufferReached.get()); + } + @Test public void testInt64Negative() {