Skip to content

Commit

Permalink
chore(ci): Ensure integration workflow passes (#643)
Browse files Browse the repository at this point in the history
Closes #641.

Unfortunately we just have to skip checking Rust compatibility due to
apache/arrow-rs#5052 (e.g.,
apache/arrow-rs#6449 ).

This PR also ensures compatibility with big endian Arrow files and Arrow
files from before the continuation token. Support for those had already
been added in the decoder but hadn't made it to the stream reader yet.

Local check:

```bash
# Assumes arrow-testing, arrow-nanoarrow, and arrow are all checked out in the same dir
export gold_dir=../arrow-testing/data/arrow-ipc-stream/integration 
export ARROW_NANOARROW_PATH=$(pwd)/build 
pip install -e "../arrow/dev/archery/[all]"
archery integration --with-nanoarrow=true --run-ipc \
    --gold-dirs=$gold_dir/0.14.1 \
    --gold-dirs=$gold_dir/0.17.1 \
    --gold-dirs=$gold_dir/1.0.0-bigendian \
    --gold-dirs=$gold_dir/1.0.0-littleendian \
    --gold-dirs=$gold_dir/2.0.0-compression \
    --gold-dirs=$gold_dir/4.0.0-shareddict
```
  • Loading branch information
paleolimbot authored Oct 7, 2024
1 parent accc8e1 commit 19d35d7
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 14 deletions.
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ services:
- ${NANOARROW_DOCKER_SOURCE_DIR}:/arrow-integration/nanoarrow
environment:
ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS: "nanoarrow"
# Rust writes invalid flatbuffers:
# https://github.com/apache/arrow-rs/issues/5052
ARCHERY_INTEGRATION_WITH_RUST: "0"
command:
["echo '::group::Build nanoarrow' &&
conda run --no-capture-output /arrow-integration/ci/scripts/nanoarrow_build.sh /arrow-integration /build &&
Expand Down
16 changes: 9 additions & 7 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,12 +1024,11 @@ static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decode

ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error) {
ArrowIpcDecoderResetHeaderInfo(decoder);
int32_t prefix_size_bytes;
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
NANOARROW_UNUSED(prefix_size_bytes);
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, prefix_size_bytes, error));
return NANOARROW_OK;
}

Expand Down Expand Up @@ -1187,15 +1186,15 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
decoder->body_size_bytes = ns(Message_bodyLength(message));

switch (decoder->metadata_version) {
case ns(MetadataVersion_V5):
case ns(MetadataVersion_V4):
case ns(MetadataVersion_V5):
break;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
ns(MetadataVersion_name(ns(Message_version(message)))));
return EINVAL;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)",
decoder->metadata_version);
Expand Down Expand Up @@ -1286,6 +1285,9 @@ ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder,
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));

NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeSchemaHeader(decoder, ns(Footer_schema(footer)), error));

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl(
ns(Footer_schema(footer)), &private_data->footer.schema, error));

Expand Down
5 changes: 4 additions & 1 deletion src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ TEST(NanoarrowIpcTest, NanoarrowIpcPeekSimpleSchema) {
data.size_bytes = sizeof(kSimpleSchema);

ArrowIpcDecoderInit(&decoder);
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &error), NANOARROW_OK);
int32_t prefix_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &prefix_size_bytes, &error),
NANOARROW_OK);
EXPECT_EQ(prefix_size_bytes, 8);
EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
EXPECT_EQ(decoder.body_size_bytes, 0);

Expand Down
83 changes: 77 additions & 6 deletions src/nanoarrow/ipc/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#define ENODATA 120
#endif

// Sentinel value to indicate that we haven't read a message yet
// and don't know the number of header prefix bytes to expect.
static const int32_t kExpectedHeaderPrefixSizeNotSet = -1;

void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
struct ArrowIpcInputStream* dst) {
memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
Expand Down Expand Up @@ -186,6 +190,7 @@ struct ArrowIpcArrayStreamReaderPrivate {
int64_t field_index;
struct ArrowBuffer header;
struct ArrowBuffer body;
int32_t expected_header_prefix_size;
struct ArrowError error;
};

Expand Down Expand Up @@ -230,6 +235,21 @@ static int ArrowIpcArrayStreamReaderNextHeader(
// propagated higher (e.g., if the stream is empty and there's no schema message)
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else if (bytes_read == 4 && private_data->expected_header_prefix_size == 4) {
// Special case very, very old IPC streams that used 0x00000000 as the
// end-of-stream indicator. We may want to remove this case at some point:
// https://github.com/apache/arrow-nanoarrow/issues/648
uint32_t last_four_bytes = 0;
memcpy(&last_four_bytes, private_data->header.data, sizeof(uint32_t));
if (last_four_bytes == 0) {
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else {
ArrowErrorSet(&private_data->error,
"Expected 0x00000000 if exactly four bytes are available at the end "
"of a stream");
return EINVAL;
}
} else if (bytes_read != 8) {
ArrowErrorSet(&private_data->error,
"Expected at least 8 bytes in remainder of stream");
Expand All @@ -241,17 +261,58 @@ static int ArrowIpcArrayStreamReaderNextHeader(
input_view.size_bytes = private_data->header.size_bytes;

// Use PeekHeader to fill in decoder.header_size_bytes
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error));
int32_t prefix_size_bytes = 0;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(
&private_data->decoder, input_view, &prefix_size_bytes, &private_data->error));

// Check for a consistent header prefix size
if (private_data->expected_header_prefix_size != kExpectedHeaderPrefixSizeNotSet &&
prefix_size_bytes != private_data->expected_header_prefix_size) {
ArrowErrorSet(&private_data->error,
"Expected prefix %d prefix header bytes but found %d",
(int)private_data->expected_header_prefix_size, (int)prefix_size_bytes);
return EINVAL;
} else {
private_data->expected_header_prefix_size = prefix_size_bytes;
}

// Legacy streams are missing the 0xFFFFFFFF at the start of the message. The
// decoder can handle this; however, verification will fail because flatbuffers
// must be 8-byte aligned. To handle this case, we prepend the continuation
// token to the start of the stream and ensure that we read four fewer bytes
// the next time we issue a read. We may be able to remove this case in the future:
// https://github.com/apache/arrow-nanoarrow/issues/648
int64_t extra_bytes_already_read = 0;
if (prefix_size_bytes == 4) {
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(&private_data->header, 4),
&private_data->error);
memmove(private_data->header.data + 4, private_data->header.data,
private_data->header.size_bytes);
uint32_t continuation = 0xFFFFFFFFU;
memcpy(private_data->header.data, &continuation, sizeof(uint32_t));
private_data->header.size_bytes += 4;
extra_bytes_already_read = 4;

input_view.data.data = private_data->header.data;
input_view.size_bytes = private_data->header.size_bytes;

int32_t new_prefix_size_bytes;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&new_prefix_size_bytes,
&private_data->error));
NANOARROW_DCHECK(new_prefix_size_bytes == 8);
}

// Read the header bytes
int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferReserve(&private_data->header, expected_header_bytes),
ArrowBufferReserve(&private_data->header,
expected_header_bytes - extra_bytes_already_read),
&private_data->error);
NANOARROW_RETURN_NOT_OK(
private_data->input.read(&private_data->input, private_data->header.data + 8,
expected_header_bytes, &bytes_read, &private_data->error));
NANOARROW_RETURN_NOT_OK(private_data->input.read(
&private_data->input, private_data->header.data + private_data->header.size_bytes,
expected_header_bytes - extra_bytes_already_read, &bytes_read,
&private_data->error));
private_data->header.size_bytes += bytes_read;

// Verify + decode the header
Expand All @@ -260,6 +321,15 @@ static int ArrowIpcArrayStreamReaderNextHeader(
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view,
&private_data->error));

// If we have a 4-byte header prefix, make sure the metadata version is V4
// (Note that some V4 IPC files have an 8 byte header prefix).
if (prefix_size_bytes == 4 &&
private_data->decoder.metadata_version != NANOARROW_IPC_METADATA_VERSION_V4) {
ArrowErrorSet(&private_data->error,
"Header prefix size of four bytes is only allowed for V4 metadata");
return EINVAL;
}

// Don't decode the message if it's of the wrong type (because the error message
// is better communicated by the caller)
if (private_data->decoder.message_type != message_type) {
Expand Down Expand Up @@ -445,6 +515,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
ArrowBufferInit(&private_data->body);
private_data->out_schema.release = NULL;
ArrowIpcInputStreamMove(input_stream, &private_data->input);
private_data->expected_header_prefix_size = kExpectedHeaderPrefixSizeNotSet;

if (options != NULL) {
private_data->field_index = options->field_index;
Expand Down
5 changes: 5 additions & 0 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,13 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
/// these bytes and returns ESPIPE if there are not enough remaining bytes in data to read
/// the entire header message, EINVAL if the first 8 bytes are not valid, ENODATA if the
/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
///
/// Pre-1.0 messages were not prefixed with 0xFFFFFFFF. For these messages, a value
/// of 4 will be placed into prefix_size_bytes; otherwise a value of 8 will be placed
/// into prefix_size_bytes.
ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error);

/// \brief Verify a message header
Expand Down

0 comments on commit 19d35d7

Please sign in to comment.