diff --git a/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java b/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java index 9c374d7a47..61f5cbc170 100644 --- a/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java +++ b/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java @@ -1104,15 +1104,10 @@ public void writeBytes(byte[] data, int off, int len) throws IOException // Stream Terminators - public void flush() - { + public void flush() throws IOException { if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled)) { - try { - unsafeFlush(); - } catch (IOException e) { - throw new RuntimeException(e); - } + unsafeFlush(); } } diff --git a/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java b/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java index b4622d028a..a52940bcb8 100644 --- a/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java +++ b/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java @@ -363,12 +363,17 @@ public PatchPoint clear() { private boolean closed; boolean autoFlushEnabled; boolean flushAfterCurrentValue; - Runnable action; + ThrowingRunnable autoFlush; public void endOfBlockSizeReached() { flushAfterCurrentValue = autoFlushEnabled; } + @FunctionalInterface + interface ThrowingRunnable { + void run() throws IOException; + } + /*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider, final int blockSize, final OutputStream out, @@ -378,7 +383,7 @@ public void endOfBlockSizeReached() { final PreallocationMode preallocationMode, final boolean isFloatBinary32Enabled, final boolean isAutoFlushEnabled, - Runnable action) + ThrowingRunnable autoFlush) throws IOException { super(optimization); @@ -410,10 +415,11 @@ public ContainerInfo newElement() { this.hasTopLevelSymbolTableAnnotation = false; this.closed = false; this.autoFlushEnabled = isAutoFlushEnabled; - this.action = action; + this.autoFlush = autoFlush; } + /** Always returns {@link Symbols#systemSymbolTable()}. */ public SymbolTable getSymbolTable() { @@ -747,7 +753,7 @@ private void finishValue() throws IOException hasWrittenValuesSinceFinished = true; hasWrittenValuesSinceConstructed = true; if (this.flushAfterCurrentValue && depth == 0) { - action.run(); + autoFlush.run(); this.flushAfterCurrentValue = false; } } diff --git a/src/com/amazon/ion/impl/bin/WriteBuffer.java b/src/com/amazon/ion/impl/bin/WriteBuffer.java index 32bed7a212..6909198e6d 100644 --- a/src/com/amazon/ion/impl/bin/WriteBuffer.java +++ b/src/com/amazon/ion/impl/bin/WriteBuffer.java @@ -30,10 +30,10 @@ private final List blocks; private Block current; private int index; - private Runnable action; + private Runnable endOfBlockCallBack; - public WriteBuffer(final BlockAllocator allocator, Runnable action) + public WriteBuffer(final BlockAllocator allocator, Runnable endOfBlockCallBack) { this.allocator = allocator; this.blocks = new ArrayList(); @@ -43,7 +43,7 @@ public WriteBuffer(final BlockAllocator allocator, Runnable action) this.index = 0; this.current = blocks.get(0); - this.action = action; + this.endOfBlockCallBack = endOfBlockCallBack; } private void allocateNewBlock() @@ -140,7 +140,7 @@ private void writeBytesSlow(final byte[] bytes, int off, int len) if (index == blocks.size() - 1) { allocateNewBlock(); - action.run(); + endOfBlockCallBack.run(); } index++; current = blocks.get(index); diff --git a/src/com/amazon/ion/system/IonBinaryWriterBuilder.java b/src/com/amazon/ion/system/IonBinaryWriterBuilder.java index 683a7c40d7..6c5344d9d9 100644 --- a/src/com/amazon/ion/system/IonBinaryWriterBuilder.java +++ b/src/com/amazon/ion/system/IonBinaryWriterBuilder.java @@ -140,7 +140,13 @@ public IvmMinimizing getIvmMinimizing() public abstract SymbolTable getInitialSymbolTable(); /** - * Enables the automatic execution of flush operations. This functionality disabled by default. + * Auto-flush enables automatic execution of flush operations during the write process. When auto-flush is enabled and a new block allocation becomes necessary while writing the top-level value, a new block will be allocated. + * The data writing process will then continue uninterrupted until the top-level value is complete. After completing the top-level value that crosses the block boundary, the flush operation will be executed. + * This feature optimizes performance of the writing of long data streams by reducing block allocations. + * Additionally, setting a larger block size can further tune performance when auto-flush is enabled. + * A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the + * block size of write buffer. + * Auto-flush disabled by default and the default block size is 32K. * @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not. */ public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled); diff --git a/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java b/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java index 81a8a3cb69..e6577af285 100644 --- a/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java +++ b/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java @@ -15,8 +15,10 @@ package com.amazon.ion.impl.bin; + import com.amazon.ion.IonDatagram; import com.amazon.ion.IonInt; +import com.amazon.ion.IonLoader; import com.amazon.ion.IonReader; import com.amazon.ion.IonStruct; import com.amazon.ion.IonSymbol; @@ -27,6 +29,8 @@ import java.io.IOException; import com.amazon.ion.system.IonBinaryWriterBuilder; +import com.amazon.ion.system.IonSystemBuilder; +import com.amazon.ion.util.Equivalence; import org.junit.Before; import org.junit.Test; @@ -39,6 +43,19 @@ public class IonManagedBinaryWriterTest extends IonManagedBinaryWriterTestCase private ByteArrayOutputStream expectedOut_32K = new ByteArrayOutputStream(); private ByteArrayOutputStream source_67K = new ByteArrayOutputStream(); private ByteArrayOutputStream expectedOut_67K = new ByteArrayOutputStream(); + + /** + *

+ * In test data, the field names are generated by appending a continuously increasing integer to the string "taco", resulting in names like "taco0", "taco1", and so on. + * These filed names are paired with symbol IDs and then stored in then symbol table during the writing process. + * The actual bytes written into the user buffer are these symbol IDs. The symbol IDs start from $10, representing "taco0", and occupy one byte in the buffer. This single-byte representation continues up to "taco117", which corresponds to symbol ID $127. + * Beyond "taco117", encoding symbol IDs require two bytes. + * Therefore, for the first 118 ion structs, each takes 10 bytes, and for ion struct from "taco118" to "taco2988", each occupies 11 bytes. + * Consequently, the total buffer size is calculated as 118 × 10 + (2988 − 118 + 1) × 11 = 32761 bytes, then we keep writing 2990th ion struct which is 11 bytes, which will overflow the block limit of 32768 bytes. + * After completing the 2990th ion struct, the flush operation will be executed if auto-flush enabled. + *

+ */ + private int FLUSH_PERIOD = 2990; @Before public void generateTestData() throws IOException { // Writing test data with continuous extending symbol table. After completing writing 3300th value, the local symbol table will stop growing. @@ -62,14 +79,13 @@ public void generateTestData() throws IOException { IonReader reader = system().newReader(source.toByteArray()); // While writing the 2990th data structure, the cumulative size of the written data will exceed the current block size. // If auto-flush enabled, the flush() operation will be executed after completing the 2990th struct. The output should be the same as manually flush after writing the 2990th data. - int flushPeriod = 2990; int index = 0; IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut); while (reader.next() != null) { expectedWriter.writeValue(reader); index++; - if (index == flushPeriod) { + if (index == FLUSH_PERIOD) { expectedWriter.flush(); index = 0; } @@ -82,13 +98,16 @@ public void generateTestData32K() throws IOException { // Writing test data with continuous extending symbol table. The total data written in user's block is 32K. IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K); int i = 0; - while (i < 2990) { + // The total size of first 2989 ion struct will be 118 x 10 + (2988 - 118 + 1) x 11 = 32761 bytes. + while (i < 2989) { defaultWriter.stepIn(IonType.STRUCT); defaultWriter.setFieldName("taco" + i); defaultWriter.writeString("burrito"); defaultWriter.stepOut(); i++; } + //Keep writing 7 bytes to user's buffer until the total data size in the current block is 32768 bytes. + defaultWriter.writeString("taco__"); defaultWriter.close(); IonReader reader = system().newReader(source_32K.toByteArray()); // No flush should be expected since the total data size written into user's buffer can fit into one block. @@ -103,6 +122,7 @@ public void generateTestData67K() throws IOException { // Writing test data with continuous extending symbol table. After completing writing 3200th value, the local symbol table will stop growing. IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_67K); int i = 0; + // The total size of first 3200 ion structs is 118 x 10 + (3199 - 118 + 1) x 11 = 35082 bytes. while (i < 3200) { defaultWriter.stepIn(IonType.STRUCT); defaultWriter.setFieldName("taco" + i); @@ -110,6 +130,8 @@ public void generateTestData67K() throws IOException { defaultWriter.stepOut(); i++; } + // Since we already have symbol token appended in the symbol table for field name "taco3", the size of ion struct after number 3200 to 6399 will be the same, which is 10 bytes. + // The total size of this part is (6399 - 3200 + 1) x 10 = 31990 bytes. The size of test data would be 35082 + 31990 = 67062 bytes. while (i >= 3200 && i < 6400) { defaultWriter.stepIn(IonType.STRUCT); defaultWriter.setFieldName("taco" + 3); @@ -120,16 +142,14 @@ public void generateTestData67K() throws IOException { defaultWriter.close(); IonReader reader = system().newReader(source_67K.toByteArray()); // After writing the 2990th data, the cumulative size of the written data in user's buffer will exceed the current block size. - // After writing the 6246th data, the cumulative size of the written data user's buffer will exceed the current block size. - // If auto-flush enabled, the flush() operation will be executed after completing the 2990th data and 6246th data. The output should be the same as manually flush after writing the 2990th and 6246th data. - int first_flush = 2990; - int second_flush = 6246; + // If auto-flush enabled, the flush() operation will be executed after completing the 2990th data. The output should be the same as manually flush after writing the 2990th data. int index = 0; IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_67K); while (reader.next() != null) { expectedWriter.writeValue(reader); index++; - if (index == first_flush || index == second_flush) { + // There is only one flush operation should be expected. After the first flush operation, there will be two blocks allocated in the write buffer which is big enough to fit the rest of data streams. + if (index == FLUSH_PERIOD) { expectedWriter.flush(); } } @@ -213,46 +233,48 @@ public void testFlushImmediatelyAfterIVM() throws Exception public void testAutoFlush() throws Exception{ IonReader reader = system().newReader(source.toByteArray()); ByteArrayOutputStream actual = new ByteArrayOutputStream(); - IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); while (reader.next() != null) { actualWriter.writeValue(reader); } actualWriter.close(); if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { - assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); + assertEquivalentDataModel(actual, expectedOut); } - assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); + // When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation. + assertEquivalentDataModel(actual, source); } @Test public void testAutoFlush_32K() throws Exception{ - IonReader reader = system().newReader(source_32K.toByteArray()); ByteArrayOutputStream actual = new ByteArrayOutputStream(); - IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); while (reader.next() != null) { actualWriter.writeValue(reader); } actualWriter.close(); if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { - assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); + assertEquivalentDataModel(actual, expectedOut_32K); } - assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); + // When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation. + assertEquivalentDataModel(actual, source_32K); } + @Test public void testAutoFlush_67K() throws Exception{ IonReader reader = system().newReader(source_67K.toByteArray()); ByteArrayOutputStream actual = new ByteArrayOutputStream(); - IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual); while (reader.next() != null) { actualWriter.writeValue(reader); } actualWriter.close(); if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { - assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); + assertEquivalentDataModel(actual, expectedOut_67K); } - assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); + assertEquivalentDataModel(actual, source_67K); } @Test @@ -404,4 +426,19 @@ public void testNestedEmptyAnnotatedContainer() throws Exception writer.stepOut(); assertValue("{bar: foo::[]}"); } + + /** + * Asserts equivalence of ion data model between two provided data streams. + * @param actual represents the serialized data streams when auto-flush is enabled. + * @param expected represents the expected data streams. + */ + private void assertEquivalentDataModel(ByteArrayOutputStream actual, ByteArrayOutputStream expected) { + IonLoader loader = IonSystemBuilder.standard().build().newLoader(); + IonDatagram actualDatagram = loader.load(actual.toByteArray()); + IonDatagram expectedDatagram = loader.load(expected.toByteArray()); + assertEquals(actualDatagram.size(), expectedDatagram.size()); + for (int i = 0; i < actualDatagram.size(); i++) { + assertTrue(Equivalence.ionEquals(actualDatagram.get(i), expectedDatagram.get(i))); + } + } } diff --git a/test/com/amazon/ion/impl/bin/WriteBufferTest.java b/test/com/amazon/ion/impl/bin/WriteBufferTest.java index 719223a826..df7575fd14 100644 --- a/test/com/amazon/ion/impl/bin/WriteBufferTest.java +++ b/test/com/amazon/ion/impl/bin/WriteBufferTest.java @@ -263,18 +263,17 @@ public void testInt56Negative() assertBuffer(bytes); } + /** + * Test if the method endOfBufferReached is invoked appropriately, the size of bytes written into the buffer overflow the current block size. + * @throws UnsupportedEncodingException + */ @Test public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException { - buf.writeBytes("taco".getBytes("UTF-8")); - buf.writeBytes("_burrito".getBytes("UTF-8")); - assertTrue(endOfBufferReached.get()); - } - - @Test - public void testEndOfBufferReachedNotInvoked() throws UnsupportedEncodingException { buf.writeBytes("taco".getBytes("UTF-8")); buf.writeBytes("burrito".getBytes("UTF-8")); assertFalse(endOfBufferReached.get()); + buf.writeBytes("_".getBytes("UTF-8")); + assertTrue(endOfBufferReached.get()); } @Test