diff --git a/src/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java b/src/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java index 9e50b4f1e8..2b008b9349 100644 --- a/src/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java +++ b/src/com/amazon/ion/impl/_Private_IonBinaryWriterBuilder.java @@ -194,7 +194,7 @@ _Private_IonBinaryWriterBuilder withInitialSymbolTable(SymbolTable symtab) } @Override - public _Private_IonBinaryWriterBuilder withAutoFlushEnbaled(boolean autoFlushEnabled) { + public _Private_IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled) { _Private_IonBinaryWriterBuilder b = mutable(); b.setAutoFlushEnabled(autoFlushEnabled); return b; diff --git a/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java b/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java index e2ad8302a5..9c374d7a47 100644 --- a/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java +++ b/src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java @@ -678,7 +678,7 @@ public void makeReadOnly() builder.preallocationMode, builder.isFloatBinary32Enabled, false, - this + this::flush ); this.user = new IonRawBinaryWriter( builder.provider, @@ -690,7 +690,7 @@ public void makeReadOnly() builder.preallocationMode, builder.isFloatBinary32Enabled, builder.isAutoFlushEnabled, - this + this::flush ); this.catalog = builder.catalog; @@ -1104,11 +1104,15 @@ public void writeBytes(byte[] data, int off, int len) throws IOException // Stream Terminators - public void flush() throws IOException + public void flush() { if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled)) { - unsafeFlush(); + try { + unsafeFlush(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java b/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java index 89c25738b5..b4622d028a 100644 --- a/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java +++ b/src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java @@ -122,7 +122,6 @@ private static byte[] bytes(int... vals) { private static final byte VARINT_NEG_ZERO = (byte) 0xC0; - private IonManagedBinaryWriter managedBinaryWriter; final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool .getInstance() @@ -362,9 +361,13 @@ public PatchPoint clear() { private boolean hasTopLevelSymbolTableAnnotation; private boolean closed; - public boolean autoFlushEnabled; - public boolean flushAfterCurrentValue = false; + boolean autoFlushEnabled; + boolean flushAfterCurrentValue; + Runnable action; + public void endOfBlockSizeReached() { + flushAfterCurrentValue = autoFlushEnabled; + } /*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider, final int blockSize, @@ -374,7 +377,8 @@ public PatchPoint clear() { final StreamFlushMode streamFlushMode, final PreallocationMode preallocationMode, final boolean isFloatBinary32Enabled, - final boolean isAutoFlushEnabled, final IonManagedBinaryWriter managedBinaryWriter) + final boolean isAutoFlushEnabled, + Runnable action) throws IOException { super(optimization); @@ -387,7 +391,7 @@ public PatchPoint clear() { this.streamFlushMode = streamFlushMode; this.preallocationMode = preallocationMode; this.isFloatBinary32Enabled = isFloatBinary32Enabled; - this.buffer = new WriteBuffer(allocator, this); + this.buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached); this.patchPoints = new _Private_RecyclingQueue<>(512, PatchPoint::new); this.containers = new _Private_RecyclingStack( 10, @@ -406,9 +410,10 @@ public ContainerInfo newElement() { this.hasTopLevelSymbolTableAnnotation = false; this.closed = false; this.autoFlushEnabled = isAutoFlushEnabled; - this.managedBinaryWriter = managedBinaryWriter; + this.action = action; } + /** Always returns {@link Symbols#systemSymbolTable()}. */ public SymbolTable getSymbolTable() { @@ -427,7 +432,7 @@ public void setFieldNameSymbol(final SymbolToken name) setFieldNameSymbol(name.getSid()); } - public WriteBuffer getCurrentBuffer() { + WriteBuffer getCurrentBuffer() { return buffer; } @@ -742,7 +747,7 @@ private void finishValue() throws IOException hasWrittenValuesSinceFinished = true; hasWrittenValuesSinceConstructed = true; if (this.flushAfterCurrentValue && depth == 0) { - managedBinaryWriter.flush(); + action.run(); this.flushAfterCurrentValue = false; } } diff --git a/src/com/amazon/ion/impl/bin/WriteBuffer.java b/src/com/amazon/ion/impl/bin/WriteBuffer.java index fd3a8592ee..32bed7a212 100644 --- a/src/com/amazon/ion/impl/bin/WriteBuffer.java +++ b/src/com/amazon/ion/impl/bin/WriteBuffer.java @@ -30,20 +30,20 @@ private final List blocks; private Block current; private int index; - private IonRawBinaryWriter rawBinaryWriter; + private Runnable action; - public WriteBuffer(final BlockAllocator allocator, IonRawBinaryWriter rawBinaryWriter) + public WriteBuffer(final BlockAllocator allocator, Runnable action) { this.allocator = allocator; this.blocks = new ArrayList(); - this.rawBinaryWriter = rawBinaryWriter; // initial seed of the first block allocateNewBlock(); this.index = 0; this.current = blocks.get(0); + this.action = action; } private void allocateNewBlock() @@ -140,9 +140,7 @@ private void writeBytesSlow(final byte[] bytes, int off, int len) if (index == blocks.size() - 1) { allocateNewBlock(); - if (rawBinaryWriter.autoFlushEnabled){ - rawBinaryWriter.flushAfterCurrentValue = true; - } + action.run(); } index++; current = blocks.get(index); diff --git a/src/com/amazon/ion/system/IonBinaryWriterBuilder.java b/src/com/amazon/ion/system/IonBinaryWriterBuilder.java index 3e319aac60..683a7c40d7 100644 --- a/src/com/amazon/ion/system/IonBinaryWriterBuilder.java +++ b/src/com/amazon/ion/system/IonBinaryWriterBuilder.java @@ -139,13 +139,16 @@ public IvmMinimizing getIvmMinimizing() */ public abstract SymbolTable getInitialSymbolTable(); - public abstract _Private_IonBinaryWriterBuilder withAutoFlushEnbaled(boolean autoFlushEnbaled); + /** + * Enables the automatic execution of flush operations. This functionality disabled by default. + * @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not. + */ + public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled); /** * Declares the symbol table to use for encoded data. * To avoid conflicts between different data streams, if the given instance * is mutable, it will be copied when {@code build()} is called. - * * @param symtab must be a local or system symbol table. * May be null, in which case the initial symbol table is that of * {@code $ion_1_0}. diff --git a/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java b/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java index f5af3c7d0a..81a8a3cb69 100644 --- a/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java +++ b/test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java @@ -35,7 +35,10 @@ public class IonManagedBinaryWriterTest extends IonManagedBinaryWriterTestCase { private ByteArrayOutputStream source = new ByteArrayOutputStream(); private ByteArrayOutputStream expectedOut = new ByteArrayOutputStream(); - private int flushTimes = 0; + private ByteArrayOutputStream source_32K = new ByteArrayOutputStream(); + private ByteArrayOutputStream expectedOut_32K = new ByteArrayOutputStream(); + private ByteArrayOutputStream source_67K = new ByteArrayOutputStream(); + private ByteArrayOutputStream expectedOut_67K = new ByteArrayOutputStream(); @Before public void generateTestData() throws IOException { // Writing test data with continuous extending symbol table. After completing writing 3300th value, the local symbol table will stop growing. @@ -55,7 +58,7 @@ public void generateTestData() throws IOException { defaultWriter.stepOut(); i++; } - defaultWriter.finish(); + defaultWriter.close(); IonReader reader = system().newReader(source.toByteArray()); // While writing the 2990th data structure, the cumulative size of the written data will exceed the current block size. // If auto-flush enabled, the flush() operation will be executed after completing the 2990th struct. The output should be the same as manually flush after writing the 2990th data. @@ -74,6 +77,65 @@ public void generateTestData() throws IOException { expectedWriter.finish(); } + @Before + public void generateTestData32K() throws IOException { + // Writing test data with continuous extending symbol table. The total data written in user's block is 32K. + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K); + int i = 0; + while (i < 2990) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + i); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + defaultWriter.close(); + IonReader reader = system().newReader(source_32K.toByteArray()); + // No flush should be expected since the total data size written into user's buffer can fit into one block. + IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_32K); + while (reader.next() != null) { + expectedWriter.writeValue(reader); + } + expectedWriter.close(); + } + @Before + public void generateTestData67K() throws IOException { + // Writing test data with continuous extending symbol table. After completing writing 3200th value, the local symbol table will stop growing. + IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_67K); + int i = 0; + while (i < 3200) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + i); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + while (i >= 3200 && i < 6400) { + defaultWriter.stepIn(IonType.STRUCT); + defaultWriter.setFieldName("taco" + 3); + defaultWriter.writeString("burrito"); + defaultWriter.stepOut(); + i++; + } + defaultWriter.close(); + IonReader reader = system().newReader(source_67K.toByteArray()); + // After writing the 2990th data, the cumulative size of the written data in user's buffer will exceed the current block size. + // After writing the 6246th data, the cumulative size of the written data user's buffer will exceed the current block size. + // If auto-flush enabled, the flush() operation will be executed after completing the 2990th data and 6246th data. The output should be the same as manually flush after writing the 2990th and 6246th data. + int first_flush = 2990; + int second_flush = 6246; + int index = 0; + IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_67K); + while (reader.next() != null) { + expectedWriter.writeValue(reader); + index++; + if (index == first_flush || index == second_flush) { + expectedWriter.flush(); + } + } + expectedWriter.close(); + } + @Test public void testSetStringAnnotations() throws Exception { @@ -149,17 +211,48 @@ public void testFlushImmediatelyAfterIVM() throws Exception @Test public void testAutoFlush() throws Exception{ - IonReader reader = system().newReader(source.toByteArray()); ByteArrayOutputStream actual = new ByteArrayOutputStream(); - IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnbaled(true).build(actual); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); while (reader.next() != null) { actualWriter.writeValue(reader); } - actualWriter.finish(); + actualWriter.close(); if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); } + assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); + } + + @Test + public void testAutoFlush_32K() throws Exception{ + + IonReader reader = system().newReader(source_32K.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); + } + assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); + } + + @Test + public void testAutoFlush_67K() throws Exception{ + IonReader reader = system().newReader(source_67K.toByteArray()); + ByteArrayOutputStream actual = new ByteArrayOutputStream(); + IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual); + while (reader.next() != null) { + actualWriter.writeValue(reader); + } + actualWriter.close(); + if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { + assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); + } + assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); } @Test diff --git a/test/com/amazon/ion/impl/bin/WriteBufferTest.java b/test/com/amazon/ion/impl/bin/WriteBufferTest.java index f799463140..719223a826 100644 --- a/test/com/amazon/ion/impl/bin/WriteBufferTest.java +++ b/test/com/amazon/ion/impl/bin/WriteBufferTest.java @@ -20,12 +20,16 @@ import static com.amazon.ion.impl.bin.WriteBuffer.writeVarUIntTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -37,36 +41,14 @@ public class WriteBufferTest { // XXX make this a prime to make it more likely that we collide on the edges of the buffer private static BlockAllocator ALLOCATOR = BlockAllocatorProviders.basicProvider().vendAllocator(11); - private static BlockAllocatorProvider provider = BlockAllocatorProviders.basicProvider(); private ByteArrayOutputStream out; - private OutputStream outputStream = new ByteArrayOutputStream(); - private IonRawBinaryWriter rawBinaryWriter; - - { - try { - rawBinaryWriter = new IonRawBinaryWriter( - provider, - 5, - outputStream, - AbstractIonWriter.WriteValueOptimization.NONE, // optimization is not relevant for the nested raw writer - IonRawBinaryWriter.StreamCloseMode.NO_CLOSE, - IonRawBinaryWriter.StreamFlushMode.NO_FLUSH, - IonRawBinaryWriter.PreallocationMode.PREALLOCATE_1, - false, - true, - null - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - private WriteBuffer buf; + private AtomicBoolean endOfBufferReached = new AtomicBoolean(false); @BeforeEach public void setup() throws IOException { - buf = rawBinaryWriter.getCurrentBuffer(); + buf = new WriteBuffer(ALLOCATOR, () -> endOfBufferReached.set(true)); out = new ByteArrayOutputStream(); } @@ -281,6 +263,20 @@ public void testInt56Negative() assertBuffer(bytes); } + @Test + public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException { + buf.writeBytes("taco".getBytes("UTF-8")); + buf.writeBytes("_burrito".getBytes("UTF-8")); + assertTrue(endOfBufferReached.get()); + } + + @Test + public void testEndOfBufferReachedNotInvoked() throws UnsupportedEncodingException { + buf.writeBytes("taco".getBytes("UTF-8")); + buf.writeBytes("burrito".getBytes("UTF-8")); + assertFalse(endOfBufferReached.get()); + } + @Test public void testInt64Negative() {