Skip to content

Commit

Permalink
Updates based on the comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
linlin-s committed Nov 30, 2023
1 parent fa0d461 commit 0432863
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ _Private_IonBinaryWriterBuilder withInitialSymbolTable(SymbolTable symtab)
}

@Override
public _Private_IonBinaryWriterBuilder withAutoFlushEnbaled(boolean autoFlushEnabled) {
public _Private_IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled) {
_Private_IonBinaryWriterBuilder b = mutable();
b.setAutoFlushEnabled(autoFlushEnabled);
return b;
Expand Down
12 changes: 8 additions & 4 deletions src/com/amazon/ion/impl/bin/IonManagedBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public void makeReadOnly()
builder.preallocationMode,
builder.isFloatBinary32Enabled,
false,
this
this::flush
);
this.user = new IonRawBinaryWriter(
builder.provider,
Expand All @@ -690,7 +690,7 @@ public void makeReadOnly()
builder.preallocationMode,
builder.isFloatBinary32Enabled,
builder.isAutoFlushEnabled,
this
this::flush
);

this.catalog = builder.catalog;
Expand Down Expand Up @@ -1104,11 +1104,15 @@ public void writeBytes(byte[] data, int off, int len) throws IOException

// Stream Terminators

public void flush() throws IOException
public void flush()
{
if (getDepth() == 0 && !user.hasAnnotations() && (localsLocked || lstAppendEnabled))
{
unsafeFlush();
try {
unsafeFlush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/com/amazon/ion/impl/bin/IonRawBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ private static byte[] bytes(int... vals) {

private static final byte VARINT_NEG_ZERO = (byte) 0xC0;

private IonManagedBinaryWriter managedBinaryWriter;

final Utf8StringEncoder utf8StringEncoder = Utf8StringEncoderPool
.getInstance()
Expand Down Expand Up @@ -362,9 +361,13 @@ public PatchPoint clear() {
private boolean hasTopLevelSymbolTableAnnotation;

private boolean closed;
public boolean autoFlushEnabled;
public boolean flushAfterCurrentValue = false;
boolean autoFlushEnabled;
boolean flushAfterCurrentValue;
Runnable action;

public void endOfBlockSizeReached() {
flushAfterCurrentValue = autoFlushEnabled;
}

/*package*/ IonRawBinaryWriter(final BlockAllocatorProvider provider,
final int blockSize,
Expand All @@ -374,7 +377,8 @@ public PatchPoint clear() {
final StreamFlushMode streamFlushMode,
final PreallocationMode preallocationMode,
final boolean isFloatBinary32Enabled,
final boolean isAutoFlushEnabled, final IonManagedBinaryWriter managedBinaryWriter)
final boolean isAutoFlushEnabled,
Runnable action)
throws IOException
{
super(optimization);
Expand All @@ -387,7 +391,7 @@ public PatchPoint clear() {
this.streamFlushMode = streamFlushMode;
this.preallocationMode = preallocationMode;
this.isFloatBinary32Enabled = isFloatBinary32Enabled;
this.buffer = new WriteBuffer(allocator, this);
this.buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached);
this.patchPoints = new _Private_RecyclingQueue<>(512, PatchPoint::new);
this.containers = new _Private_RecyclingStack<ContainerInfo>(
10,
Expand All @@ -406,9 +410,10 @@ public ContainerInfo newElement() {
this.hasTopLevelSymbolTableAnnotation = false;
this.closed = false;
this.autoFlushEnabled = isAutoFlushEnabled;
this.managedBinaryWriter = managedBinaryWriter;
this.action = action;
}


/** Always returns {@link Symbols#systemSymbolTable()}. */
public SymbolTable getSymbolTable()
{
Expand All @@ -427,7 +432,7 @@ public void setFieldNameSymbol(final SymbolToken name)
setFieldNameSymbol(name.getSid());
}

public WriteBuffer getCurrentBuffer() {
WriteBuffer getCurrentBuffer() {
return buffer;
}

Expand Down Expand Up @@ -742,7 +747,7 @@ private void finishValue() throws IOException
hasWrittenValuesSinceFinished = true;
hasWrittenValuesSinceConstructed = true;
if (this.flushAfterCurrentValue && depth == 0) {
managedBinaryWriter.flush();
action.run();
this.flushAfterCurrentValue = false;
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/com/amazon/ion/impl/bin/WriteBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@
private final List<Block> blocks;
private Block current;
private int index;
private IonRawBinaryWriter rawBinaryWriter;
private Runnable action;


public WriteBuffer(final BlockAllocator allocator, IonRawBinaryWriter rawBinaryWriter)
public WriteBuffer(final BlockAllocator allocator, Runnable action)
{
this.allocator = allocator;
this.blocks = new ArrayList<Block>();
this.rawBinaryWriter = rawBinaryWriter;

// initial seed of the first block
allocateNewBlock();

this.index = 0;
this.current = blocks.get(0);
this.action = action;
}

private void allocateNewBlock()
Expand Down Expand Up @@ -140,9 +140,7 @@ private void writeBytesSlow(final byte[] bytes, int off, int len)
if (index == blocks.size() - 1)
{
allocateNewBlock();
if (rawBinaryWriter.autoFlushEnabled){
rawBinaryWriter.flushAfterCurrentValue = true;
}
action.run();
}
index++;
current = blocks.get(index);
Expand Down
7 changes: 5 additions & 2 deletions src/com/amazon/ion/system/IonBinaryWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,16 @@ public IvmMinimizing getIvmMinimizing()
*/
public abstract SymbolTable getInitialSymbolTable();

public abstract _Private_IonBinaryWriterBuilder withAutoFlushEnbaled(boolean autoFlushEnbaled);
/**
* Enables the automatic execution of flush operations. This functionality disabled by default.
* @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not.
*/
public abstract IonBinaryWriterBuilder withAutoFlushEnabled(boolean autoFlushEnabled);

/**
* Declares the symbol table to use for encoded data.
* To avoid conflicts between different data streams, if the given instance
* is mutable, it will be copied when {@code build()} is called.
*
* @param symtab must be a local or system symbol table.
* May be null, in which case the initial symbol table is that of
* {@code $ion_1_0}.
Expand Down
103 changes: 98 additions & 5 deletions test/com/amazon/ion/impl/bin/IonManagedBinaryWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public class IonManagedBinaryWriterTest extends IonManagedBinaryWriterTestCase
{
private ByteArrayOutputStream source = new ByteArrayOutputStream();
private ByteArrayOutputStream expectedOut = new ByteArrayOutputStream();
private int flushTimes = 0;
private ByteArrayOutputStream source_32K = new ByteArrayOutputStream();
private ByteArrayOutputStream expectedOut_32K = new ByteArrayOutputStream();
private ByteArrayOutputStream source_67K = new ByteArrayOutputStream();
private ByteArrayOutputStream expectedOut_67K = new ByteArrayOutputStream();
@Before
public void generateTestData() throws IOException {
// Writing test data with continuous extending symbol table. After completing writing 3300th value, the local symbol table will stop growing.
Expand All @@ -55,7 +58,7 @@ public void generateTestData() throws IOException {
defaultWriter.stepOut();
i++;
}
defaultWriter.finish();
defaultWriter.close();
IonReader reader = system().newReader(source.toByteArray());
// While writing the 2990th data structure, the cumulative size of the written data will exceed the current block size.
// If auto-flush enabled, the flush() operation will be executed after completing the 2990th struct. The output should be the same as manually flush after writing the 2990th data.
Expand All @@ -74,6 +77,65 @@ public void generateTestData() throws IOException {
expectedWriter.finish();
}

@Before
public void generateTestData32K() throws IOException {
// Writing test data with continuous extending symbol table. The total data written in user's block is 32K.
IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K);
int i = 0;
while (i < 2990) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + i);
defaultWriter.writeString("burrito");
defaultWriter.stepOut();
i++;
}
defaultWriter.close();
IonReader reader = system().newReader(source_32K.toByteArray());
// No flush should be expected since the total data size written into user's buffer can fit into one block.
IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_32K);
while (reader.next() != null) {
expectedWriter.writeValue(reader);
}
expectedWriter.close();
}
@Before
public void generateTestData67K() throws IOException {
// Writing test data with continuous extending symbol table. After completing writing 3200th value, the local symbol table will stop growing.
IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_67K);
int i = 0;
while (i < 3200) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + i);
defaultWriter.writeString("burrito");
defaultWriter.stepOut();
i++;
}
while (i >= 3200 && i < 6400) {
defaultWriter.stepIn(IonType.STRUCT);
defaultWriter.setFieldName("taco" + 3);
defaultWriter.writeString("burrito");
defaultWriter.stepOut();
i++;
}
defaultWriter.close();
IonReader reader = system().newReader(source_67K.toByteArray());
// After writing the 2990th data, the cumulative size of the written data in user's buffer will exceed the current block size.
// After writing the 6246th data, the cumulative size of the written data user's buffer will exceed the current block size.
// If auto-flush enabled, the flush() operation will be executed after completing the 2990th data and 6246th data. The output should be the same as manually flush after writing the 2990th and 6246th data.
int first_flush = 2990;
int second_flush = 6246;
int index = 0;
IonWriter expectedWriter = IonBinaryWriterBuilder.standard().withLocalSymbolTableAppendEnabled().build(expectedOut_67K);
while (reader.next() != null) {
expectedWriter.writeValue(reader);
index++;
if (index == first_flush || index == second_flush) {
expectedWriter.flush();
}
}
expectedWriter.close();
}

@Test
public void testSetStringAnnotations() throws Exception
{
Expand Down Expand Up @@ -149,17 +211,48 @@ public void testFlushImmediatelyAfterIVM() throws Exception

@Test
public void testAutoFlush() throws Exception{

IonReader reader = system().newReader(source.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnbaled(true).build(actual);
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.finish();
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray());
}
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray());
}

@Test
public void testAutoFlush_32K() throws Exception{

IonReader reader = system().newReader(source_32K.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray());
}
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray());
}

@Test
public void testAutoFlush_67K() throws Exception{
IonReader reader = system().newReader(source_67K.toByteArray());
ByteArrayOutputStream actual = new ByteArrayOutputStream();
IonWriter actualWriter = IonBinaryWriterBuilder.standard().withAutoFlushEnabled(true).build(actual);
while (reader.next() != null) {
actualWriter.writeValue(reader);
}
actualWriter.close();
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) {
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray());
}
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray());
}

@Test
Expand Down
46 changes: 21 additions & 25 deletions test/com/amazon/ion/impl/bin/WriteBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import static com.amazon.ion.impl.bin.WriteBuffer.writeVarUIntTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -37,36 +41,14 @@ public class WriteBufferTest
{
// XXX make this a prime to make it more likely that we collide on the edges of the buffer
private static BlockAllocator ALLOCATOR = BlockAllocatorProviders.basicProvider().vendAllocator(11);
private static BlockAllocatorProvider provider = BlockAllocatorProviders.basicProvider();
private ByteArrayOutputStream out;
private OutputStream outputStream = new ByteArrayOutputStream();
private IonRawBinaryWriter rawBinaryWriter;

{
try {
rawBinaryWriter = new IonRawBinaryWriter(
provider,
5,
outputStream,
AbstractIonWriter.WriteValueOptimization.NONE, // optimization is not relevant for the nested raw writer
IonRawBinaryWriter.StreamCloseMode.NO_CLOSE,
IonRawBinaryWriter.StreamFlushMode.NO_FLUSH,
IonRawBinaryWriter.PreallocationMode.PREALLOCATE_1,
false,
true,
null
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private WriteBuffer buf;
private AtomicBoolean endOfBufferReached = new AtomicBoolean(false);

@BeforeEach
public void setup() throws IOException
{
buf = rawBinaryWriter.getCurrentBuffer();
buf = new WriteBuffer(ALLOCATOR, () -> endOfBufferReached.set(true));
out = new ByteArrayOutputStream();
}

Expand Down Expand Up @@ -281,6 +263,20 @@ public void testInt56Negative()
assertBuffer(bytes);
}

@Test
public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException {
buf.writeBytes("taco".getBytes("UTF-8"));
buf.writeBytes("_burrito".getBytes("UTF-8"));
assertTrue(endOfBufferReached.get());
}

@Test
public void testEndOfBufferReachedNotInvoked() throws UnsupportedEncodingException {
buf.writeBytes("taco".getBytes("UTF-8"));
buf.writeBytes("burrito".getBytes("UTF-8"));
assertFalse(endOfBufferReached.get());
}

@Test
public void testInt64Negative()
{
Expand Down

0 comments on commit 0432863

Please sign in to comment.