Skip to content

Commit

Permalink
fix(extensions/nanoarrow_ipc): Fix crash and mixleading error message…
Browse files Browse the repository at this point in the history
…s resulting from corrupted streams (#289)

Closes #287.

The reported crash occurred because the internal
`ArrowIpcDecoderVerifyHeader()` incorrectly interpreted the return value
of `ArrowIpcDecoderCheckHeader()`. Because the header checker returned
an error code sometimes even if it succeeded, we had been ignoring the
error in `ArrowIpcDecoderVerifyHeader()` which resulted in issuing
commands like `memcpy(dst, src, -8)`. I believe that's undefined
behaviour, leading to the intermittent nature of the crash.

In adding tests for this kind of error, I also made some improvements to
error messages along the way.

```bash
# docker run --rm -it ghcr.io/apache/arrow-nanoarrow:ubuntu
# git clone https://github.com/apache/arrow-nanoarrow.git /arrow-nanoarrow
# or
# docker run --rm -it -v$(pwd):/arrow-nanoarrow ghcr.io/apache/arrow-nanoarrow:ubuntu

cd /arrow-nanoarrow/extensions/nanoarrow_ipc
mkdir build && cd build
cmake .. -DNANOARROW_IPC_BUILD_APPS=ON

curl https://gist.githubusercontent.com/amoeba/b64fc94ba5224bafcb3734bd261181d5/raw/af4c93da7ce6affba74a80e1ba94ed9573e91be8/test_arrow_data | \
    base64 -d > test_binary

with_byte_removed() {
    BYTE_PLUS_ONE=$(($2 + 2))
    cat $1 | head -c $2
    cat $1 | tail -c "+$BYTE_PLUS_ONE"
}

cmake --build .

echo "Errors:" > out.txt
for i in {1..32951}; do
  echo "$i/32951"
  echo "$i/32951" >> out.txt
  with_byte_removed test_binary $i | ./dump_stream - 2>> out.txt
done
```
  • Loading branch information
paleolimbot authored Aug 26, 2023
1 parent 4ace1a2 commit a3f0929
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 43 deletions.
66 changes: 43 additions & 23 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
#include "nanoarrow_ipc.h"
#include "nanoarrow_ipc_flatcc_generated.h"

// A more readable expression way to refer to the fact that there are 8 bytes
// at the beginning of every message header.
const static int64_t kMessageHeaderPrefixSize = 8;

// Internal representation of a parsed "Field" from flatbuffers. This
// represents a field in a depth-first walk of column arrays and their
// children.
Expand Down Expand Up @@ -929,16 +933,17 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode
private_data->last_message = NULL;
}

// Returns NANOARROW_OK if data is large enough to read the message header,
// ESPIPE if reading more data might help, or EINVAL if the content is not valid
static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView* data_mut,
int32_t* message_size_bytes,
struct ArrowError* error) {
// Returns NANOARROW_OK if data is large enough to read the first 8 bytes
// of the message header, ESPIPE if reading more data might help, or EINVAL if the content
// is not valid. Advances the input ArrowBufferView by 8 bytes.
static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView* data_mut,
int32_t* message_size_bytes,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

if (data_mut->size_bytes < 8) {
if (data_mut->size_bytes < kMessageHeaderPrefixSize) {
ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes remain",
(long)data_mut->size_bytes);
return ESPIPE;
Expand All @@ -953,18 +958,12 @@ static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,

int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG;
int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
*message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
*message_size_bytes = header_body_size_bytes + kMessageHeaderPrefixSize;
if (header_body_size_bytes < 0) {
ArrowErrorSet(
error, "Expected message body size > 0 but found message body size of %ld bytes",
(long)header_body_size_bytes);
return EINVAL;
} else if (header_body_size_bytes > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found message "
"body size of %ld bytes",
(long)data_mut->size_bytes, (long)header_body_size_bytes);
return ESPIPE;
}

if (header_body_size_bytes == 0) {
Expand All @@ -982,8 +981,8 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
decoder, &data, &decoder->header_size_bytes, error));
return NANOARROW_OK;
}

Expand All @@ -994,13 +993,23 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
decoder, &data, &decoder->header_size_bytes, error));

// Check that data contains at least the entire header (return ESPIPE to signal
// that reading more data may help).
int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize;
if (data.size_bytes < message_body_size) {
ArrowErrorSet(error,
"Expected >= %ld bytes of remaining data but found %ld bytes in buffer",
(long)message_body_size + kMessageHeaderPrefixSize,
(long)data.size_bytes + kMessageHeaderPrefixSize);
return ESPIPE;
}

// Run flatbuffers verification
if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes - (2 * sizeof(int32_t)))) !=
flatcc_verify_ok) {
if (ns(Message_verify_as_root(data.data.as_uint8, message_body_size) !=
flatcc_verify_ok)) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}
Expand All @@ -1022,8 +1031,19 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
decoder, &data, &decoder->header_size_bytes, error));

// Check that data contains at least the entire header (return ESPIPE to signal
// that reading more data may help).
int64_t message_body_size = decoder->header_size_bytes - kMessageHeaderPrefixSize;
if (data.size_bytes < message_body_size) {
ArrowErrorSet(error,
"Expected >= %ld bytes of remaining data but found %ld bytes in buffer",
(long)message_body_size + kMessageHeaderPrefixSize,
(long)data.size_bytes + kMessageHeaderPrefixSize);
return ESPIPE;
}

ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,36 +127,58 @@ TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
data.data.as_uint8 = kSimpleSchema;
data.size_bytes = 1;

// For each error, check both Verify and Decode

ArrowIpcDecoderInit(&decoder);

EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
EXPECT_STREQ(error.message,
"Expected data of at least 8 bytes but only 1 bytes remain");

ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE);
EXPECT_STREQ(error.message,
"Expected data of at least 8 bytes but only 1 bytes remain");

uint32_t eight_bad_bytes[] = {0, 0};
data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
data.size_bytes = 8;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
"Expected 0xFFFFFFFF at start of message but found 0x00000000");

ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
"Expected 0xFFFFFFFF at start of message but found 0x00000000");

eight_bad_bytes[0] = 0xFFFFFFFF;
eight_bad_bytes[1] = negative_one_le;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
"Expected message body size > 0 but found message body size of -1 bytes");

ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
"Expected message body size > 0 but found message body size of -1 bytes");

eight_bad_bytes[1] = one_le;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);

EXPECT_STREQ(error.message,
"Expected 0 <= message body size <= 0 bytes but found message body size "
"of 1 bytes");
"Expected >= 9 bytes of remaining data but found 8 bytes in buffer");
ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ESPIPE);
EXPECT_STREQ(error.message,
"Expected >= 9 bytes of remaining data but found 8 bytes in buffer");

eight_bad_bytes[0] = 0xFFFFFFFF;
eight_bad_bytes[1] = 0;
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
EXPECT_STREQ(error.message, "End of Arrow stream");
ArrowErrorInit(&error);
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), ENODATA);
EXPECT_STREQ(error.message, "End of Arrow stream");

ArrowIpcDecoderReset(&decoder);
}
Expand Down Expand Up @@ -191,15 +213,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
EXPECT_EQ(decoder.body_size_bytes, 0);

uint8_t simple_schema_invalid[280];
memcpy(simple_schema_invalid, kSimpleSchema, sizeof(simple_schema_invalid));
memset(simple_schema_invalid + 8, 0xFF, sizeof(simple_schema_invalid) - 8);

data.data.as_uint8 = simple_schema_invalid;
data.size_bytes = sizeof(kSimpleSchema);
EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message, "Message flatbuffer verification failed");

ArrowIpcDecoderReset(&decoder);
}

Expand All @@ -221,6 +234,35 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleRecordBatch) {
ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcVerifyInvalid) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;

uint8_t simple_schema_invalid[sizeof(kSimpleSchema)];
struct ArrowBufferView data;
data.data.as_uint8 = simple_schema_invalid;
data.size_bytes = sizeof(simple_schema_invalid);

ArrowIpcDecoderInit(&decoder);

// Create invalid data by removing bytes one at a time and ensuring an error code and
// a null-terminated error. After byte 265 this passes because the values being modified
// are parts of the flatbuffer that won't cause overrun.
for (int64_t i = 1; i < 265; i++) {
SCOPED_TRACE(i);

memcpy(simple_schema_invalid, kSimpleSchema, i);
memcpy(simple_schema_invalid + i, kSimpleSchema + (i + 1),
(sizeof(simple_schema_invalid) - i));

ArrowErrorInit(&error);
ASSERT_NE(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
ASSERT_GT(strlen(error.message), 0);
}

ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
Expand Down
17 changes: 9 additions & 8 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,8 @@ static int ArrowIpcArrayStreamReaderNextHeader(
input_view.size_bytes = private_data->header.size_bytes;

// Use PeekHeader to fill in decoder.header_size_bytes
int result =
ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, &private_data->error);
if (result == ENODATA) {
return result;
}
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error));

// Read the header bytes
int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
Expand Down Expand Up @@ -348,7 +345,7 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
struct ArrowArray* out) {
struct ArrowIpcArrayStreamReaderPrivate* private_data =
(struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
private_data->error.message[0] = '\0';
ArrowErrorInit(&private_data->error);
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));

// Read + decode the next header
Expand All @@ -359,6 +356,9 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
// end of stream bytes were read.
out->release = NULL;
return NANOARROW_OK;
} else if (result != NANOARROW_OK) {
// Other error
return result;
}

// Make sure we have a RecordBatch message
Expand All @@ -376,10 +376,11 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
struct ArrowIpcSharedBuffer shared;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcSharedBufferInit(&shared, &private_data->body), &private_data->error);
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
result = ArrowIpcDecoderDecodeArrayFromShared(
&private_data->decoder, &shared, private_data->field_index, &tmp,
NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error));
NANOARROW_VALIDATION_LEVEL_FULL, &private_data->error);
ArrowIpcSharedBufferReset(&shared);
NANOARROW_RETURN_NOT_OK(result);
} else {
struct ArrowBufferView body_view;
body_view.data.data = private_data->body.data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,64 @@ TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) {
stream.release(&stream);
}

TEST(NanoarrowIpcTest, StreamReaderInvalidBuffer) {
struct ArrowBuffer input_buffer;
struct ArrowIpcInputStream input;
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray array;

uint8_t simple_stream_invalid[sizeof(kSimpleSchema) + sizeof(kSimpleRecordBatch)];
struct ArrowBufferView data;
data.data.as_uint8 = simple_stream_invalid;

// Create invalid data by removing bytes one at a time and ensuring an error code and
// a null-terminated error. After byte 273/280 this passes because the bytes are just
// padding.
data.size_bytes = sizeof(kSimpleSchema);
for (int64_t i = 1; i < 273; i++) {
SCOPED_TRACE(i);

memcpy(simple_stream_invalid, kSimpleSchema, i);
memcpy(simple_stream_invalid + i, kSimpleSchema + (i + 1),
(sizeof(kSimpleSchema) - i));

ArrowBufferInit(&input_buffer);
ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK);
ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK);
ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK);

ASSERT_NE(stream.get_schema(&stream, &schema), NANOARROW_OK);
ASSERT_GT(strlen(stream.get_last_error(&stream)), 0);

stream.release(&stream);
}

// Do the same exercise removing bytes of the record batch message.
// Similarly, this succeeds if the byte removed is part of the padding at the end.
memcpy(simple_stream_invalid, kSimpleSchema, sizeof(kSimpleSchema));
data.size_bytes = sizeof(simple_stream_invalid);
for (int64_t i = 1; i < 144; i++) {
SCOPED_TRACE(i);

memcpy(simple_stream_invalid + sizeof(kSimpleSchema), kSimpleRecordBatch, i);
memcpy(simple_stream_invalid + sizeof(kSimpleSchema) + i,
kSimpleRecordBatch + (i + 1), (sizeof(kSimpleRecordBatch) - i));

ArrowBufferInit(&input_buffer);
ASSERT_EQ(ArrowBufferAppendBufferView(&input_buffer, data), NANOARROW_OK);
ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK);
ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK);

ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
schema.release(&schema);
ASSERT_NE(stream.get_next(&stream, &array), NANOARROW_OK);
ASSERT_GT(strlen(stream.get_last_error(&stream)), 0);

stream.release(&stream);
}
}

TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) {
struct ArrowBuffer input_buffer;
ArrowBufferInit(&input_buffer);
Expand Down

0 comments on commit a3f0929

Please sign in to comment.