Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Aug 2, 2024
1 parent fdf7e45 commit 9667735
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 116 deletions.
14 changes: 7 additions & 7 deletions src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ struct ArrowArrayViewEqualTo {
}

field_path.push_back(0);
for (int64_t i = 0; i < actual->n_children; ++i) {
for (int64_t i = 0; i < actual->n_children; i++) {
field_path.back() = i;
if (!MatchAndExplain(field_path, actual->children[i], expected->children[i], os)) {
return false;
Expand All @@ -843,6 +843,10 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) {
arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}), schema.get())
.ok());

nanoarrow::UniqueArrayView array_view;
ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), &error),
NANOARROW_OK);

// now make one empty struct array with this schema and another with all zeroes
nanoarrow::UniqueArray empty_array, zero_array;
for (auto* array : {empty_array.get(), zero_array.get()}) {
Expand All @@ -852,19 +856,15 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) {
ASSERT_EQ(ArrowArrayAppendEmpty(array, 5), NANOARROW_OK);
}
ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK);

nanoarrow::UniqueArrayView array_view;
ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), &error),
NANOARROW_OK);
ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array, &error), NANOARROW_OK)
<< error.message;

nanoarrow::ipc::UniqueEncoder encoder;
EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);

nanoarrow::UniqueBuffer buffer, body_buffer;
ArrowIpcEncoderBuildContiguousBodyBuffer(encoder.get(), body_buffer.get());
EXPECT_EQ(ArrowIpcEncoderEncodeRecordBatch(encoder.get(), array_view.get(), &error),
EXPECT_EQ(ArrowIpcEncoderEncodeSimpleRecordBatch(encoder.get(), array_view.get(),
body_buffer.get(), &error),
NANOARROW_OK)
<< error.message;
EXPECT_EQ(
Expand Down
171 changes: 98 additions & 73 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,19 @@

#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)

#define FLATCC_RETURN_UNLESS_0(x) \
if (ns(x) != 0) return ENOMEM;
#define FLATCC_RETURN_UNLESS_0_NO_NS(x) \
if ((x) != 0) { \
ArrowErrorSet(error, "%s:%d: %s failed", __FILE__, __LINE__, #x); \
return ENOMEM; \
}

#define FLATCC_RETURN_UNLESS_0(x) FLATCC_RETURN_UNLESS_0_NO_NS(ns(x))

#define FLATCC_RETURN_IF_NULL(x) \
if (!(x)) { \
ArrowErrorSet(error, "%s:%d: %s was null", __FILE__, __LINE__, #x); \
return ENOMEM; \
}

struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
Expand All @@ -39,8 +50,6 @@ struct ArrowIpcEncoderPrivate {
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder != NULL);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
encoder->encode_buffer = NULL;
encoder->encode_buffer_state = NULL;
encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
struct ArrowIpcEncoderPrivate* private =
Expand Down Expand Up @@ -87,32 +96,28 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) : NANOARROW_OK;
}

const void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
if (data == NULL) {
return ENOMEM;
}

int64_t i = out->size_bytes;
if (encapsulate) {
int64_t encapsulated_size =
_ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
NANOARROW_RETURN_NOT_OK(
ArrowBufferResize(out, out->size_bytes + encapsulated_size, 0));
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
} else {
NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, out->size_bytes + size, 0));
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
}

void* data = flatcc_builder_copy_buffer(&private->builder, out->data + out->size_bytes,
(size_t)size);
NANOARROW_DCHECK(data != NULL);
out->size_bytes += size;

if (encapsulate) {
memcpy(out->data + i, &header, sizeof(header));
i += sizeof(header);
// zero padding bytes, if any
int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
out->size_bytes = padded_size;
}

memcpy(out->data + i, data, size);
i += size;

// zero padding bytes, if any
memset(out->data + i, 0, out->size_bytes - i);

// don't deallocate yet, just wipe the builder's current Message
flatcc_builder_reset(&private->builder);
return NANOARROW_OK;
Expand Down Expand Up @@ -273,9 +278,7 @@ static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
if (n != 0) {
FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n));
if (!type_ids_32) {
return ENOMEM;
}
FLATCC_RETURN_IF_NULL(type_ids_32);

for (int i = 0; i < n; ++i) {
type_ids_32[i] = type_ids[i];
Expand Down Expand Up @@ -319,15 +322,11 @@ static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
while (metadata.remaining_keys > 0) {
struct ArrowStringView key, value;
NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value));
if (push_start(builder) != 0) {
return ENOMEM;
}
FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder));
FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, key.size_bytes));
FLATCC_RETURN_UNLESS_0(
KeyValue_value_create_strn(builder, value.data, value.size_bytes));
if (!push_end(builder)) {
return ENOMEM;
}
FLATCC_RETURN_IF_NULL(push_end(builder));
}
return NANOARROW_OK;
}
Expand All @@ -339,13 +338,9 @@ static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder,
(*push_end)(flatcc_builder_t*),
struct ArrowError* error) {
for (int i = 0; i < schema->n_children; ++i) {
if (push_start(builder) != 0) {
return ENOMEM;
}
FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder));
NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], error));
if (!push_end(builder)) {
return ENOMEM;
}
FLATCC_RETURN_IF_NULL(push_end(builder));
}
return NANOARROW_OK;
}
Expand Down Expand Up @@ -412,22 +407,46 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder,

FLATCC_RETURN_UNLESS_0(Schema_features_start(builder));
ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1));
if (!features) {
return ENOMEM;
}
FLATCC_RETURN_IF_NULL(features);
features[0] = ns(Feature_COMPRESSED_BODY);
FLATCC_RETURN_UNLESS_0(Schema_features_end(builder));

FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));

FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)));
return NANOARROW_OK;
}

struct ArrowIpcBufferEncoder {
/// \brief Callback invoked against each buffer to be encoded
///
/// Encoding of buffers is left as a callback to accommodate dissociated data storage.
/// One implementation of this callback might copy all buffers into a contiguous body
/// for use in an arrow IPC stream, another implementation might store offsets and
/// lengths relative to a known arena.
ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
struct ArrowIpcEncoder* encoder,
struct ArrowIpcBufferEncoder* buffer_encoder,
int64_t* offset, int64_t* length,
struct ArrowError* error);

/// \brief Pointer to arbitrary data used by encode_buffer()
void* encode_buffer_state;

/// \brief Finalized body length of the most recently encoded RecordBatch message
///
/// encode_buffer() is expected to update this while encoding each buffer. After all
/// buffers are encoded, this will be written to the RecordBatch's .bodyLength
int64_t body_length;
};

static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, int64_t* offset,
int64_t* length, struct ArrowError* error) {
struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state;
struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
struct ArrowIpcBufferEncoder* buffer_encoder, int64_t* offset, int64_t* length,
struct ArrowError* error) {
struct ArrowBuffer* body_buffer =
(struct ArrowBuffer*)buffer_encoder->encode_buffer_state;

int compressed_buffer_header =
encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) : 0;
Expand Down Expand Up @@ -458,21 +477,14 @@ static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size - buffer_end));

encoder->body_length = body_buffer->size_bytes;
buffer_encoder->body_length = body_buffer->size_bytes;
return NANOARROW_OK;
}

void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
struct ArrowBuffer* body_buffer) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
body_buffer != NULL);
encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback;
encoder->encode_buffer_state = body_buffer;
}

static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* error) {
struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder,
const struct ArrowArrayView* array_view, struct ArrowBuffer* buffers,
struct ArrowBuffer* nodes, struct ArrowError* error) {
if (array_view->offset != 0) {
ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
return ENOTSUP;
Expand All @@ -482,25 +494,29 @@ static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
const struct ArrowArrayView* child = array_view->children[c];

struct ns(FieldNode) node = {child->length, child->null_count};
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(nodes, &node, sizeof(node)));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferAppend(nodes, &node, sizeof(node)),
error);

for (int64_t b = 0; b < child->array->n_buffers; ++b) {
struct ns(Buffer) buffer;
NANOARROW_RETURN_NOT_OK(encoder->encode_buffer(
child->buffer_views[b], encoder, &buffer.offset, &buffer.length, error));
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(buffers, &buffer, sizeof(buffer)));
NANOARROW_RETURN_NOT_OK(
buffer_encoder->encode_buffer(child->buffer_views[b], encoder, buffer_encoder,
&buffer.offset, &buffer.length, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferAppend(buffers, &buffer, sizeof(buffer)), error);
}

NANOARROW_RETURN_NOT_OK(
ArrowIpcEncoderEncodeRecordBatchImpl(encoder, child, buffers, nodes, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
encoder, buffer_encoder, child, buffers, nodes, error));
}
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder,
const struct ArrowArrayView* array_view,
struct ArrowError* error) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema != NULL);
static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(
struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder,
const struct ArrowArrayView* array_view, struct ArrowError* error) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
buffer_encoder != NULL && buffer_encoder->encode_buffer != NULL);

if (array_view->null_count != 0 && ArrowArrayViewComputeNullCount(array_view) != 0) {
ArrowErrorSet(error,
Expand All @@ -515,11 +531,6 @@ ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder,
return EINVAL;
}

if (!encoder->encode_buffer) {
ArrowErrorSet(error, "No encode_buffer behavior provided when encoding RecordBatch");
return EINVAL;
}

struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;

Expand All @@ -528,8 +539,6 @@ ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder,
FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));

encoder->body_length = 0;

FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder));
if (encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
FLATCC_RETURN_UNLESS_0(RecordBatch_compression_start(builder));
Expand All @@ -541,7 +550,7 @@ ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder,
ArrowBufferResize(&private->buffers, 0, 0);
ArrowBufferResize(&private->nodes, 0, 0);
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
encoder, array_view, &private->buffers, &private->nodes, error));
encoder, buffer_encoder, array_view, &private->buffers, &private->nodes, error));

FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create( //
builder, (struct ns(FieldNode)*)private->nodes.data,
Expand All @@ -552,6 +561,22 @@ ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder,

FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_end(builder));

FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, encoder->body_length));
return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, buffer_encoder->body_length));
FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)));
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(
struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
struct ArrowBuffer* body_buffer, struct ArrowError* error) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
body_buffer != NULL);

struct ArrowIpcBufferEncoder buffer_encoder = {
.encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback,
.encode_buffer_state = body_buffer,
.body_length = 0,
};

return ArrowIpcEncoderEncodeRecordBatch(encoder, &buffer_encoder, array_view, error);
}
3 changes: 0 additions & 3 deletions src/nanoarrow/ipc/encoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);

EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
EXPECT_EQ(encoder->body_length, 0);
EXPECT_EQ(encoder->encode_buffer, nullptr);
EXPECT_EQ(encoder->encode_buffer_state, nullptr);

auto* p = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
ASSERT_NE(p, nullptr);
Expand Down
Loading

0 comments on commit 9667735

Please sign in to comment.