Skip to content

Commit

Permalink
Updates based on comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
linlin-s committed Dec 2, 2023
1 parent 4d37ef0 commit f953f5c
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 41 deletions.
9 changes: 2 additions & 7 deletions src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -1104,15 +1104,10 @@ public void writeBytes(byte[] data, int off, int len) throws IOException

// Stream Terminators

public void flush()
{
public void flush() throws IOException {
if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled))
{
try {
unsafeFlush();
} catch (IOException e) {
throw new RuntimeException(e);
}
unsafeFlush();
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,17 @@ public PatchPoint clear() {
private boolean closed;
boolean autoFlushEnabled;
boolean flushAfterCurrentValue;
Runnable action;
ThrowingRunnable autoFlush;

public void endOfBlockSizeReached() {
flushAfterCurrentValue = autoFlushEnabled;
}

@FunctionalInterface
interface ThrowingRunnable {
void run() throws IOException;
}

/*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider,
final int blockSize,
final OutputStream out,
Expand All @@ -378,7 +383,7 @@ public void endOfBlockSizeReached() {
final PreallocationMode preallocationMode,
final boolean isFloatBinary32Enabled,
final boolean isAutoFlushEnabled,
Runnable action)
ThrowingRunnable autoFlush)
throws IOException
{
super(optimization);
Expand Down Expand Up @@ -410,10 +415,11 @@ public ContainerInfo newElement() {
this.hasTopLevelSymbolTableAnnotation = false;
this.closed = false;
this.autoFlushEnabled = isAutoFlushEnabled;
this.action = action;
this.autoFlush = autoFlush;
}



/** Always returns {@link Symbols#systemSymbolTable()}. */
public SymbolTable getSymbolTable()
{
Expand Down Expand Up @@ -747,7 +753,7 @@ private void finishValue() throws IOException
hasWrittenValuesSinceFinished = true;
hasWrittenValuesSinceConstructed = true;
if (this.flushAfterCurrentValue && depth == 0) {
action.run();
autoFlush.run();
this.flushAfterCurrentValue = false;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/com/amazon/ion/impl/bin/WriteBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
private final List<Block> blocks;
private Block current;
private int index;
private Runnable action;
private Runnable endOfBlockCallBack;


public WriteBuffer(final BlockAllocator allocator, Runnable action)
public WriteBuffer(final BlockAllocator allocator, Runnable endOfBlockCallBack)
{
this.allocator = allocator;
this.blocks = new ArrayList<Block>();
Expand All @@ -43,7 +43,7 @@ public WriteBuffer(final BlockAllocator allocator, Runnable action)

this.index = 0;
this.current = blocks.get(0);
this.action = action;
this.endOfBlockCallBack = endOfBlockCallBack;
}

private void allocateNewBlock()
Expand Down Expand Up @@ -140,7 +140,7 @@ private void writeBytesSlow(final byte[] bytes, int off, int len)
if (index == blocks.size() - 1)
{
allocateNewBlock();
action.run();
endOfBlockCallBack.run();
}
index++;
current = blocks.get(index);
Expand Down
8 changes: 7 additions & 1 deletion src/com/amazon/ion/system/IonBinaryWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,13 @@ public IvmMinimizing getIvmMinimizing()
public abstract SymbolTable getInitialSymbolTable();

/**
* Enables the automatic execution of flush operations. This functionality disabled by default.
* Auto-flush enables automatic execution of flush operations during the write process. When auto-flush is enabled and a new block allocation becomes necessary while writing the top-level value, a new block will be allocated.
* The data writing process will then continue uninterrupted until the top-level value is complete. After completing the top-level value that crosses the block boundary, the flush operation will be executed.
* This feature optimizes performance of the writing of long data streams by reducing block allocations.
* Additionally, setting a larger block size can further tune performance when auto-flush is enabled.
* A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the
* block size of write buffer.
* Auto-flush disabled by default and the default block size is 32K.
* @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not.
*/
public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled);
Expand Down
73 changes: 55 additions & 18 deletions test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.amazon.ion.impl.bin;


import com.amazon.ion.IonDatagram;
import com.amazon.ion.IonInt;
import com.amazon.ion.IonLoader;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonStruct;
import com.amazon.ion.IonSymbol;
Expand All @@ -27,6 +29,8 @@
import java.io.IOException;

import com.amazon.ion.system.IonBinaryWriterBuilder;
import com.amazon.ion.system.IonSystemBuilder;
import com.amazon.ion.util.Equivalence;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -39,6 +43,19 @@ public class IonManagedBinaryWriterTest extends IonManagedBinaryWriterTestCase
private ByteArrayOutputStream expectedOut_32K = new ByteArrayOutputStream();
private ByteArrayOutputStream source_67K = new ByteArrayOutputStream();
private ByteArrayOutputStream expectedOut_67K = new ByteArrayOutputStream();

/**
* <p>
* In test data, the field names are generated by appending a continuously increasing integer to the string "taco", resulting in names like "taco0", "taco1", and so on.
* These filed names are paired with symbol IDs and then stored in then symbol table during the writing process.
* The actual bytes written into the user buffer are these symbol IDs. The symbol IDs start from $10, representing "taco0", and occupy one byte in the buffer. This single-byte representation continues up to "taco117", which corresponds to symbol ID $127.
* Beyond "taco117", encoding symbol IDs require two bytes.
* Therefore, for the first 118 ion structs, each takes 10 bytes, and for ion struct from "taco118" to "taco2988", each occupies 11 bytes.
* Consequently, the total buffer size is calculated as 118 × 10 + (2988 − 118 + 1) × 11 = 32761 bytes, then we keep writing 2990th ion struct which is 11 bytes, which will overflow the block limit of 32768 bytes.
* After completing the 2990th ion struct, the flush operation will be executed if auto-flush enabled.
* </p>
*/
private int FLUSH_PERIOD = 2990;
@Before
public void generateTestData() throws IOException {
// Writing test data with continuous extending symbol table. After completing writing 3300th value, the local symbol table will stop growing.
Expand All @@ -62,14 +79,13 @@ public void generateTestData() throws IOException {
IonReader reader = system().newReader(source.toByteArray());
// While writing the 2990th data structure, the cumulative size of the written data will exceed the current block size.
// If auto-flush enabled, the flush() operation will be executed after completing the 2990th struct. The output should be the same as manually flush after writing the 2990th data.
int flushPeriod = 2990;
int index = 0;

IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut);
while (reader.next() != null) {
expectedWriter.writeValue(reader);
index++;
if (index == flushPeriod) {
if (index == FLUSH_PERIOD) {
expectedWriter.flush();
index = 0;
}
Expand All @@ -82,13 +98,16 @@ public void generateTestData32K() throws IOException {
// Writing test data with continuous extending symbol table. The total data written in user's block is 32K.
IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K);
int i = 0;
while (i < 2990) {
// The total size of first 2989 ion struct will be 118 x 10 + (2988 - 118 + 1) x 11 = 32761 bytes.
while (i < 2989) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + i);
defaultWriter.writeString("burrito");
defaultWriter.stepOut();
i++;
}
//Keep writing 7 bytes to user's buffer until the total data size in the current block is 32768 bytes.
defaultWriter.writeString("taco__");
defaultWriter.close();
IonReader reader = system().newReader(source_32K.toByteArray());
// No flush should be expected since the total data size written into user's buffer can fit into one block.
Expand All @@ -103,13 +122,16 @@ public void generateTestData67K() throws IOException {
// Writing test data with continuous extending symbol table. After completing writing 3200th value, the local symbol table will stop growing.
IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_67K);
int i = 0;
// The total size of first 3200 ion structs is 118 x 10 + (3199 - 118 + 1) x 11 = 35082 bytes.
while (i < 3200) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + i);
defaultWriter.writeString("burrito");
defaultWriter.stepOut();
i++;
}
// Since we already have symbol token appended in the symbol table for field name "taco3", the size of ion struct after number 3200 to 6399 will be the same, which is 10 bytes.
// The total size of this part is (6399 - 3200 + 1) x 10 = 31990 bytes. The size of test data would be 35082 + 31990 = 67062 bytes.
while (i >= 3200 && i < 6400) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + 3);
Expand All @@ -120,16 +142,14 @@ public void generateTestData67K() throws IOException {
defaultWriter.close();
IonReader reader = system().newReader(source_67K.toByteArray());
// After writing the 2990th data, the cumulative size of the written data in user's buffer will exceed the current block size.
// After writing the 6246th data, the cumulative size of the written data user's buffer will exceed the current block size.
// If auto-flush enabled, the flush() operation will be executed after completing the 2990th data and 6246th data. The output should be the same as manually flush after writing the 2990th and 6246th data.
int first_flush = 2990;
int second_flush = 6246;
// If auto-flush enabled, the flush() operation will be executed after completing the 2990th data. The output should be the same as manually flush after writing the 2990th data.
int index = 0;
IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_67K);
while (reader.next() != null) {
expectedWriter.writeValue(reader);
index++;
if (index == first_flush || index == second_flush) {
// There is only one flush operation should be expected. After the first flush operation, there will be two blocks allocated in the write buffer which is big enough to fit the rest of data streams.
if (index == FLUSH_PERIOD) {
expectedWriter.flush();
}
}
Expand Down Expand Up @@ -213,46 +233,48 @@ public void testFlushImmediatelyAfterIVM() throws Exception
public void testAutoFlush() throws Exception{
IonReader reader = system().newReader(source.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray());
assertEquivalentDataModel(actual, expectedOut);
}
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray());
// When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation.
assertEquivalentDataModel(actual, source);
}

@Test
public void testAutoFlush_32K() throws Exception{

IonReader reader = system().newReader(source_32K.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray());
assertEquivalentDataModel(actual, expectedOut_32K);
}
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray());
// When auto-flush mode is disabled, the serialized data should be equivalent to the source data without flush operation.
assertEquivalentDataModel(actual, source_32K);
}


@Test
public void testAutoFlush_67K() throws Exception{
IonReader reader = system().newReader(source_67K.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(autoFlushMode.isEnabled()).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray());
assertEquivalentDataModel(actual, expectedOut_67K);
}
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray());
assertEquivalentDataModel(actual, source_67K);
}

@Test
Expand Down Expand Up @@ -404,4 +426,19 @@ public void testNestedEmptyAnnotatedContainer() throws Exception
writer.stepOut();
assertValue("{bar: foo::[]}");
}

/**
* Asserts equivalence of ion data model between two provided data streams.
* @param actual represents the serialized data streams when auto-flush is enabled.
* @param expected represents the expected data streams.
*/
private void assertEquivalentDataModel(ByteArrayOutputStream actual, ByteArrayOutputStream expected) {
IonLoader loader = IonSystemBuilder.standard().build().newLoader();
IonDatagram actualDatagram = loader.load(actual.toByteArray());
IonDatagram expectedDatagram = loader.load(expected.toByteArray());
assertEquals(actualDatagram.size(), expectedDatagram.size());
for (int i = 0; i < actualDatagram.size(); i++) {
assertTrue(Equivalence.ionEquals(actualDatagram.get(i), expectedDatagram.get(i)));
}
}
}
13 changes: 6 additions & 7 deletions test/com/amazon/ion/impl/bin/WriteBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,17 @@ public void testInt56Negative()
assertBuffer(bytes);
}

/**
* Test if the method endOfBufferReached is invoked appropriately, the size of bytes written into the buffer overflow the current block size.
* @throws UnsupportedEncodingException
*/
@Test
public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException {
buf.writeBytes("taco".getBytes("UTF-8"));
buf.writeBytes("_burrito".getBytes("UTF-8"));
assertTrue(endOfBufferReached.get());
}

@Test
public void testEndOfBufferReachedNotInvoked() throws UnsupportedEncodingException {
buf.writeBytes("taco".getBytes("UTF-8"));
buf.writeBytes("burrito".getBytes("UTF-8"));
assertFalse(endOfBufferReached.get());
buf.writeBytes("_".getBytes("UTF-8"));
assertTrue(endOfBufferReached.get());
}

@Test
Expand Down

0 comments on commit f953f5c

Please sign in to comment.