-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable auto-flush in ion binary writer. #651
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good feature. I've added some comments.
Please add tests that write values that are:
- Exactly the size of a block
- More than twice the size of a block
public boolean autoFlushEnabled; | ||
public boolean flushAfterCurrentValue = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these need to be public? We should prefer default visibility, or private if possible.
@@ -369,7 +373,8 @@ public PatchPoint clear() { | |||
final StreamCloseMode streamCloseMode, | |||
final StreamFlushMode streamFlushMode, | |||
final PreallocationMode preallocationMode, | |||
final boolean isFloatBinary32Enabled) | |||
final boolean isFloatBinary32Enabled, | |||
final boolean isAutoFlushEnabled, final IonManagedBinaryWriter managedBinaryWriter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a newline before the final parameter for consistency with the existing style.
@@ -139,6 +139,7 @@ public IvmMinimizing getIvmMinimizing() | |||
*/ | |||
public abstract SymbolTable getInitialSymbolTable(); | |||
|
|||
public abstract _Private_IonBinaryWriterBuilder withAutoFlushEnbaled(boolean autoFlushEnbaled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return IonBinaryWriterBuilder
, and it needs a JavaDoc comment like the others in this class.
if (rawBinaryWriter.autoFlushEnabled){ | ||
rawBinaryWriter.flushAfterCurrentValue = true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than storing a reference to an IonRawBinaryWriter
, we should keep the concerns of these two classes separate by storing a reference to a callback method in this class. The IonRawBinaryWriter would provide this callback to the WriteBuffer upon construction, using something like:
private boolean flushAfterCurrentValue;
private WriteBuffer buffer = new WriteBuffer(allocator, this::endOfBlockSizeReached);
private void endOfBlockReached() {
flushAfterCurrentValue = autoFlushEnabled;
}
And here you'd just invoke the callback method.
This will also help you clean up WriteBufferTest, because you can provide a test callback without having to instantiate a raw writer.
@@ -730,6 +741,10 @@ private void finishValue() | |||
} | |||
hasWrittenValuesSinceFinished = true; | |||
hasWrittenValuesSinceConstructed = true; | |||
if (this.flushAfterCurrentValue && depth == 0) { | |||
managedBinaryWriter.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than storing a reference to an IonManagedBinaryWriter, we should keep the concerns of these two classes separate by storing a reference to a callback method in this class. The IonManagedBinaryWriter would provide this callback to the IonRawBinaryWriter upon construction, using something like:
this.user = new IonRawBinaryWriter(..., this::flush);
And here you'd invoke that callback instead of calling the managed writer's flush()
method directly.
@@ -420,6 +427,10 @@ public void setFieldNameSymbol(final SymbolToken name) | |||
setFieldNameSymbol(name.getSid()); | |||
} | |||
|
|||
public WriteBuffer getCurrentBuffer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this can have default visibility, instead of public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this can have default visibility, instead of public.
Thanks for catching this, will set to default in the next commit.
while (reader.next() != null) { | ||
actualWriter.writeValue(reader); | ||
} | ||
actualWriter.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be actualWriter.close();
?
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add an assertion after this branch to make sure that in all cases actual
and expectedOut
are data model equivalent.
@@ -30,20 +30,20 @@ | |||
private final List<Block> blocks; | |||
private Block current; | |||
private int index; | |||
private IonRawBinaryWriter rawBinaryWriter; | |||
private Runnable action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a more descriptive name. When is the action invoked? (When the end of a block is reached.) Consider something like onEndOfBlock
or endOfBlockCallback
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will update in the next commit.
/** | ||
* Enables the automatic execution of flush operations. This functionality disabled by default. | ||
* @param autoFlushEnabled A boolean parameter indicating whether this functionality is enabled or not. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to describe when the auto-flush occurs, going into detail about the relationship to block size, and linking to the option that allows the user to configure that block size.
try { | ||
unsafeFlush(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not change the exception throwing behavior here. We may need to define our own Functional Interface, like
@FunctionalInterface
class ThrowingRunnable() {
void run() throws IOException;
}
this::flush()
should be able to conform to that without changing its signature.
public boolean flushAfterCurrentValue = false; | ||
boolean autoFlushEnabled; | ||
boolean flushAfterCurrentValue; | ||
Runnable action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a more descriptive name, like onAutoFlush
.
@Test | ||
public void testEndOfBufferReachedInvoked() throws UnsupportedEncodingException { | ||
buf.writeBytes("taco".getBytes("UTF-8")); | ||
buf.writeBytes("_burrito".getBytes("UTF-8")); | ||
assertTrue(endOfBufferReached.get()); | ||
} | ||
|
||
@Test | ||
public void testEndOfBufferReachedNotInvoked() throws UnsupportedEncodingException { | ||
buf.writeBytes("taco".getBytes("UTF-8")); | ||
buf.writeBytes("burrito".getBytes("UTF-8")); | ||
assertFalse(endOfBufferReached.get()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments to these tests. It's not immediately clear why adding the underscore changes the behavior. Consider noting the block size, and add assertions before and after the boundary to verify the transition happens exactly when expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some comments to these tests. It's not immediately clear why adding the underscore changes the behavior. Consider noting the block size, and add assertions before and after the boundary to verify the transition happens exactly when expected.
Sure, I will add the comment in the next commit. If we are adding assert before and after the boundary, we might not need two unit tests.
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); | ||
} | ||
assertArrayEquals(actual.toByteArray(), expectedOut.toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same assertion from line 222. This should fail if auto flush is disabled, but I see that we're not actually running this test with auto-flush disabled. Consider using @ParameterizedTest
to pass in both true and false for the value of withAutoFlushEnabled
, then change this to an assertion of data model equality (i.e. using Equivalence.ionEquals). That way we can assert different behavior based on whether auto-flush is enabled, but verify in both cases that the streams are data model equivalent, which is what ultimately matters to the user.
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); | ||
} | ||
assertArrayEquals(actual.toByteArray(), expectedOut_32K.toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here regarding test parameterization and data model equality.
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); | ||
} | ||
assertArrayEquals(actual.toByteArray(), expectedOut_67K.toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here regarding test parameterization and data model equality.
// Writing test data with continuous extending symbol table. The total data written in user's block is 32K. | ||
IonWriter defaultWriter = IonBinaryWriterBuilder.standard().build(source_32K); | ||
int i = 0; | ||
while (i < 2990) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How was 2990 chosen? A comment explaining the math behind it might help.
defaultWriter.stepOut(); | ||
i++; | ||
} | ||
while (i >= 3200 && i < 6400) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How were 3200 and 6400 chosen?
* Additionally, setting a larger block size can further tune performance when auto-flush is enabled. | ||
* A larger block size leads to fewer block allocations and reduces the frequency of flush operations when auto-flush is enabled. {@link #withBlockSize(int) Here} is where you can set up the | ||
* block size of write buffer. | ||
* Auto-flush disabled by default and the default block size is 32K. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Auto-flush disabled by default and the default block size is 32K. | |
* Auto-flush is disabled by default and the default block size is 32K. |
|
||
/** | ||
* In test data, the field names are generated by appending a continuously increasing integer to the string "taco", resulting in names like "taco0", "taco1", and so on. | ||
* These filed names are paired with symbol IDs and then stored in then symbol table during the writing process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* These filed names are paired with symbol IDs and then stored in then symbol table during the writing process. | |
* These field names are paired with symbol IDs and then stored in then symbol table during the writing process. |
assertEquals(actualDatagram.size(), expectedDatagram.size()); | ||
for (int i = 0; i < actualDatagram.size(); i++) { | ||
assertTrue(Equivalence.ionEquals(actualDatagram.get(i), expectedDatagram.get(i))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you replace this with assertEquals(expectedDatagram, actualDatagram);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I will replace it in the next commit.
} | ||
actualWriter.close(); | ||
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertEquivalentDataModel(actual, expectedOut_32K); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you actually do want to compare the bytes, right? To ensure the auto-flush occurred.
} | ||
actualWriter.close(); | ||
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertEquivalentDataModel(actual, expectedOut_67K); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you actually do want to compare the bytes, right? To ensure the auto-flush occurred.
} | ||
actualWriter.close(); | ||
if (lstAppendMode.isEnabled() && autoFlushMode.isEnabled()) { | ||
assertEquivalentDataModel(actual, expectedOut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you actually do want to compare the bytes, right? To ensure the auto-flush occurred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should include both bytes comparison and data model comparison in the test. I will add the bytes comparison in the next commit.
Issue #, if available:
N/A
Description of changes:
Motivation:
This PR introduces an option for users to enable auto-flush while writing a long stream of binary data. Frequent flushing improves performance by releasing memory pressure. This is achieved by preventing the continual allocation of new blocks, which can negatively impact performance.
The ion-java library currently offers methods for flushing data from buffers, such as
writer.flush()
andwriter.finish()
. However, many users are either unaware of these features or do not know when to use them for optimal performance. Typical real-world usage involves writing the entire data stream and then callingclose()/finish()
without any intermediate flushing. This pattern misses out the potential performance benefits of periodic flushing. To enable users to automatically leverage these benefits, we have added a configuration option toIonBinaryWriterBuilder
. This allows users to enable auto-flushing capabilities in the binary writer.Implementation details:
With auto-flush enabled, the flush operation is executed only between top-level values. The flush is triggered during the block boundary check while writing data into the buffer. When the incoming data exceeds the remaining block size and there are no reusable blocks available in the current buffer, an additional block will still be allocated. However, the flush operation will occur after completing the current top-level value. This approach enables the reuse of allocated blocks, thereby eliminating the need for continuous new block allocation.
Test:
For testing the auto-flush feature, we write predefined structures and calculate the number of values that will exceed the block size. We then compare the outputs between the writer with auto-flush enabled and the writer where flushing is done manually after writing the specific number of values. If the auto-flush is executed correctly, the outputs in both scenarios should be identical.
Benchmark results:
Benchmark a write of data equivalent to a of stream of 59155 nested values using IonWriter(binary). The output data will write into an in-memory buffer. (3 forks, 2 warmups, 2 iterations, preallocation 1)
Benchmark a write of data equivalent to a of stream of 194627 nested values using IonWriter(binary). The output data will write into an in-memory buffer. (3 forks, 2 warmups, 2 iterations, preallocation 1)
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.