-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add ipc RecordBatch encoding #555
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you're still working on this, just a few preliminary things to think about!
In general, I do think we are going to need an ArrowIpcEncoder
to avoid painful refactors or excessive numbers of arguments when we add additional features. It may also help collapse status = ArrowXXXX()
into NANOARROW_RETURN_NOT_OK()
(by making the current flatcc builder something that's cleaned up on ArrowIpcEncoderReset()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more thoughts! Feel free to leave the RecordBatch
encoding to a follow-up PR (it would be easier to review, although I'm also happy to review together if that's easier for you!).
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #555 +/- ##
==========================================
+ Coverage 88.92% 89.10% +0.17%
==========================================
Files 89 95 +6
Lines 16339 15421 -918
==========================================
- Hits 14530 13741 -789
+ Misses 1809 1680 -129 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
The structure of this is great. I know I said earlier that one big PR was OK, but the scope has enlarged to the point where I would prefer to review as multiple PRs. In particular, it makes it easier to ensure full test coverage for each component (also vaguely better for the changelog). Perhaps:
- Add array view null counter
- Add encoder.c with the encoder infrastructure and test for create/destroy
- Add schema writing
- Add batch writing + unit tests
- Add encoder to the ipc_files test (which is our current infrastructure for sending random batches of various types through our IPC library)
I know that there are some testing patterns and style differences with Arrow C++ that take some getting used to here. I'm absolutely open to changing those; however, if they do change they should change in the rest of the code base too (in dedicated PRs). I think the smaller PRs will help with that (to slowly get used to the existing conventions without having to rewrite things).
Adds `ArrowArrayViewComputeNullCount()` and tests. Extracted from #555
Add `ArrowIpcEncoder`, init/reset, and tests. Extracted from #555 (review)
5f36f64
to
0068806
Compare
Rebased on #568 |
- added ArrowIpcEncoderEncodeSchema - added a parameter to ArrowIpcEncoderFinalizeBuffer which controls whether encapsulated/padded message buffers will be produced instead of raw - tests reuse the decoder tests, replacing arrow C++'s encoder with ArrowIpcEncoder Extracted from #555 (review)
95ea4cf
to
2e3469b
Compare
src/nanoarrow/ipc/encoder.c
Outdated
int compressed_buffer_header = | ||
encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to have the codec
member of ArrowIpcEncoder
? It seems like that is a very simple way of handling compression that will have to change at some point (either to handle more complex ways of choosing which buffers to compress or adding options unrelated to compression).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it suffices for now. Even if we later extend with more advanced heuristics for determining if a buffer should be compressed, the IPC format only allows a single codec to be provided per schema/stream (which means we'll still have only one codec per encoder).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had in mind something more like how it is specified in Arrow C++ (as a member of an Options struct that includes some other options about IPC writing). I don't see any advantages to premptively adding references to a feature that doesn't exist yet (and might never exist, although I hope that it does).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove compression support for now. I'm not sure how an options struct would be advantageous here, but we can hash that out later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed (it might not be useful, or might not be useful here!)
src/nanoarrow/ipc/encoder.c
Outdated
ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder, | ||
const struct ArrowArrayView* array_view, | ||
struct ArrowError* error) { | ||
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema != NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does schema
exist here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, that's a copy paste error. I found this in another PR. It seems that NANOARROW_DEBUG
isn't getting defined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that would be:
arrow-nanoarrow/CMakeLists.txt
Line 148 in f74d57c
target_compile_definitions(nanoarrow PUBLIC "$<$<CONFIG:Debug>:NANOARROW_DEBUG>") |
...that doesn't exist for nanoarrow_ipc
. (No need to fix that here since it's not your fault 😬 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed by #573
src/nanoarrow/ipc/encoder.c
Outdated
FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create( // | ||
builder, (struct ns(FieldNode)*)private->nodes.data, | ||
private->nodes.size_bytes / sizeof(struct ns(FieldNode)))); | ||
FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create( // | ||
builder, (struct ns(Buffer)*)private->buffers.data, | ||
private->buffers.size_bytes / sizeof(struct ns(Buffer)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth just caching the builder
for the RecordBatch message and the int64_t*
or ns(FieldNode)*
to the start of each (to avoid allocating a builder for every batch?). The decoder requires initializing with a schema to make some allocations that only need to happen once (here an ArrowArrayView
would do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean; the builder
is allocated in Init and only deallocated in Reset. The buffers used to store ns(FieldNode)
or ns(Buffer)
are also reused for each batch until Reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we could avoid creating the new root/fields and copying our cached buffer into it (updating in place instead), but flatcc is probably be very good at anticipating repeated message building and avoiding allocations (and the copy is probably not expensive).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what state flatcc maintains inside its builders. Some of the example code suggests we could have it maintain independent vectors which are not specifically part of a field which is currently being built. I've tried to use that a few times and gotten odd errors when I strayed from strict state machine style, so I'd prefer to experiment independent of other changes in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
src/nanoarrow/ipc/decoder_test.cc
Outdated
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) { | |||
ArrowIpcDecoderReset(&decoder); | |||
} | |||
|
|||
struct ArrowArrayViewEqualTo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome! The negative matches and messages here are also not tested and I am not sure anybody looking to see if we had a utility to help with array equality would look in decoder_test.cc
to find it. Something like:
void AssertArrayViewIdentical(actual, expected) {
NANOARROW_DCHECK(actual->dictionary != nullptr);
NANOARROW_DCHECK(expected->dictionary != nullptr);
ASSERT_EQ(actual->storage_type, expected->storage_type);
ASSERT_EQ(actual->offset, expected->offset);
ASSERT_EQ(actual->length, expected->length);
for (int i = 0; i < 3; i++) {
auto a_buf = actual->buffer_views[i];
auto e_buf = expected->buffer_views[i];
ASSERT_EQ(a_buf.size_bytes, e_buf->size_bytes);
if (a_buf.size_bytes != 0) {
ASSERT_EQ(memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes), 0);
}
}
ASSERT_EQ(actual->n_children, expected->n_children);
for (int i = 0; i < actual->n_children; i++) {
AssertArrayViewIdentical(actual->children[i], expected->children[i]);
}
}
...will give terrible error messages but can be followed up with a tested version of this helper (or a version that partly lives in the C library since this comes up in C situations as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we leave extraction of an array equality helper for a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! But since you still need something to test the roundtrip, this suggestion was to use something more compact than the equality helper that is currently here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! In the meantime I bet we can pin the sha of the action before the change or something (I'll try Monday) (meant for another thread! 😬 )
…572) First noticed at #555 (comment) , the R check action is failing because an update to r-lib actions resulted in some quarto actions being invoked, and these have not yet been whitelisted for use in Apache repositories. It also may be that we don't need the quarto actions (we probably don't) but some brief experimentation to attempt circumventing the use of the quarto action did not result in a successful workflow. Hence, a pin to unblock PR checks until either the v2 branch is updated or it is clear how to avoid the failure.
In previous PRs we consolidated the extensions into the main CMakeLists.txt; however, there were some things happening for certain targets (like installing them or setting NANOARROW_DEBUG) but not others. Noticed in #555 (comment) where there was a DCHECK referencing a variable that didn't exist that made it through CI 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
ArrowIpcEncoderEncodeRecordBatch()
ArrowIpcEncoderBuildContiguousBodyBuffer()
to provide a default buffer encoder implementation which simply concatenates a batch's buffers into one contiguous (properly aligned and padded) body buffer