Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed May 21, 2024
2 parents 9a69256 + 040d03b commit b88e1d4
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 39 deletions.
1 change: 0 additions & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ build:
tools:
python: "3.11"
commands:
- git fetch --unshallow || true
- pip install --upgrade pip
- doc/build.sh -r -V # install deps, don't make a venv
2 changes: 1 addition & 1 deletion apis/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,5 @@ def run(self):
},
python_requires=">=3.8",
cmdclass={"build_ext": build_ext, "bdist_wheel": bdist_wheel},
version=version.getVersion(),
version=version.get_version(),
)
11 changes: 5 additions & 6 deletions apis/python/src/tiledbsoma/soma_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ void load_soma_dataframe(py::module& m) {
uintptr_t schema_ptr = (uintptr_t)(&schema);
py_schema.attr("_export_to_c")(schema_ptr);

for (int64_t sch_idx = 0; sch_idx < schema.n_children;
++sch_idx) {
auto child = schema.children[sch_idx];
auto metadata = py_schema.attr("metadata");
if (py::hasattr(metadata, "get")) {
auto metadata = py_schema.attr("metadata");
if (py::hasattr(metadata, "get")) {
for (int64_t i = 0; i < schema.n_children; ++i) {
auto child = schema.children[i];
auto val = metadata.attr("get")(
py::str(child->name).attr("encode")("utf-8"));

if (val != py::none() &&
val.cast<std::string>() == "nullable") {
child->flags &= ARROW_FLAG_NULLABLE;
child->flags |= ARROW_FLAG_NULLABLE;
} else {
child->flags &= ~ARROW_FLAG_NULLABLE;
}
Expand Down
18 changes: 18 additions & 0 deletions apis/python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,3 +1430,21 @@ def test_enum_schema_report(tmp_path):
f = sdf.schema.field("byte_cat")
assert f.type.index_type == pa.int8()
assert f.type.value_type == pa.binary()


def test_nullable(tmp_path):
uri = tmp_path.as_posix()

asch = pa.schema([pa.field("foo", pa.int32())], metadata={"foo": "nullable"})

pydict = {}
pydict["soma_joinid"] = [0, 1, 2, 3, 4]
pydict["foo"] = [10, 20, 30, None, 50]
data = pa.Table.from_pydict(pydict)

with soma.DataFrame.create(uri, schema=asch) as sdf:
sdf.write(data)

with soma.DataFrame.open(uri, "r") as sdf:
df = sdf.read().concat().to_pandas()
assert df.compare(data.to_pandas()).empty
36 changes: 36 additions & 0 deletions libtiledbsoma/src/soma/column_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,42 @@ class ColumnBuffer {
}
}

/**
* @brief Set the ColumnBuffer's data for string and binary (as opposed to
* large string or large binary)
*
* @param data pointer to the beginning of the data to write
* @param num_elems the number of elements in the column
*/
void set_data(
uint64_t num_elems,
const void* data,
uint32_t* offsets,
uint8_t* validity = nullptr) {
num_cells_ = num_elems;

auto num_offsets = num_elems + 1;
std::vector<uint32_t> offset_holder;
offset_holder.resize(num_offsets);
offset_holder.assign(
(uint32_t*)offsets, (uint32_t*)offsets + num_offsets);
offsets_ = std::vector<uint64_t>(
offset_holder.begin(), offset_holder.end());

data_size_ = offsets_[num_offsets - 1];
data_.resize(data_size_);
data_.assign((std::byte*)data, (std::byte*)data + data_size_);

if (is_nullable_) {
if (validity != nullptr) {
validity_.assign(validity, validity + num_elems);
} else {
validity_.resize(num_elems);
std::fill(validity_.begin(), validity_.end(), 1);
}
}
}

/**
* @brief Size num_cells_ to match the read query results.
*
Expand Down
84 changes: 55 additions & 29 deletions libtiledbsoma/src/soma/soma_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,24 @@ void SOMAArray::set_column_data(
const void* data,
uint64_t* offsets,
uint8_t* validity) {
auto column = SOMAArray::_setup_column_data(name);
column->set_data(num_elems, data, offsets, validity);
mq_->set_column_data(column);
};

void SOMAArray::set_column_data(
std::string_view name,
uint64_t num_elems,
const void* data,
uint32_t* offsets,
uint8_t* validity) {
auto column = SOMAArray::_setup_column_data(name);
column->set_data(num_elems, data, offsets, validity);
mq_->set_column_data(column);
};

std::shared_ptr<ColumnBuffer> SOMAArray::_setup_column_data(
std::string_view name) {
if (mq_->query_type() != TILEDB_WRITE) {
throw TileDBSOMAError("[SOMAArray] array must be opened in write mode");
}
Expand All @@ -433,14 +451,13 @@ void SOMAArray::set_column_data(
// `set_column_data` because ColumnBuffer::create requires a TileDB Array
// argument which should remain a private member of SOMAArray
auto column = ColumnBuffer::create(arr_, name);
column->set_data(num_elems, data, offsets, validity);

// Keep the ColumnBuffer alive by attaching it to the ArrayBuffers class
// member. Otherwise, the data held by the ColumnBuffer will be garbage
// collected before it is submitted to the write query
array_buffer_->emplace(std::string(name), column);

mq_->set_column_data(column);
return column;
};

void SOMAArray::set_array_data(
Expand All @@ -451,49 +468,58 @@ void SOMAArray::set_array_data(
}

// Create the array_buffer_ as necessary
if (array_buffer_ == nullptr)
if (array_buffer_ == nullptr) {
array_buffer_ = std::make_shared<ArrayBuffers>();
}

for (auto i = 0; i < arrow_schema->n_children; ++i) {
auto arrow_sch_ = arrow_schema->children[i];
auto arrow_arr_ = arrow_array->children[i];

const void* data;
uint64_t* offsets = nullptr;
// Create a ColumnBuffer object instead of passing it in as an argument
// to `set_column_data` because ColumnBuffer::create requires a TileDB
// Array argument which should remain a private member of SOMAArray
auto column = ColumnBuffer::create(arr_, arrow_sch_->name);

const void* data;
uint8_t* validities = nullptr;
auto table_offset = arrow_arr_->offset;
auto data_size = tiledb::impl::type_size(
ArrowAdapter::to_tiledb_format(arrow_sch_->format));

if (arrow_arr_->null_count != 0) {
validities = (uint8_t*)arrow_arr_->buffers[0];
validities = (uint8_t*)arrow_arr_->buffers[0] + table_offset;
}

if (arrow_arr_->n_buffers == 3) {
offsets = (uint64_t*)arrow_arr_->buffers[1];
data = arrow_arr_->buffers[2];
if ((strcmp(arrow_sch_->format, "u") == 0) ||
(strcmp(arrow_sch_->format, "z") == 0)) {
uint32_t* offsets = (uint32_t*)arrow_arr_->buffers[1] +
table_offset;
column->set_data(
arrow_arr_->length,
(char*)data + table_offset * data_size,
offsets,
validities);
} else {
uint64_t* offsets = (uint64_t*)arrow_arr_->buffers[1] +
table_offset;
column->set_data(
arrow_arr_->length,
(char*)data + table_offset * data_size,
offsets,
validities);
}

} else {
data = arrow_arr_->buffers[1];
column->set_data(
arrow_arr_->length,
(char*)data + table_offset * data_size,
static_cast<uint64_t*>(nullptr),
validities);
}

auto table_offset = arrow_arr_->offset;
auto data_size = tiledb::impl::type_size(
ArrowAdapter::to_tiledb_format(arrow_sch_->format));

if (offsets) {
offsets += table_offset;
}
if (validities) {
validities += table_offset;
}

// Create a ColumnBuffer object instead of passing it in as an argument
// to `set_column_data` because ColumnBuffer::create requires a TileDB
// Array argument which should remain a private member of SOMAArray
auto column = ColumnBuffer::create(arr_, arrow_sch_->name);
column->set_data(
arrow_arr_->length,
(char*)data + table_offset * data_size,
offsets,
validities);

// Keep the ColumnBuffer alive by attaching it to the ArrayBuffers class
// member. Otherwise, the data held by the ColumnBuffer will be garbage
// collected before it is submitted to the write query
Expand Down
21 changes: 21 additions & 0 deletions libtiledbsoma/src/soma/soma_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ class SOMAArray : public SOMAObject {
uint64_t* offsets = nullptr,
uint8_t* validity = nullptr);

/**
* @brief Set the write buffers for string or binary with 32-bit offsets
* (as opposed to large string or large binary with 64-bit offsets).
*
* @param name Name of the column
* @param num_elems Number of elements to write
* @param data Pointer to the beginning of the data buffer
* @param offsets Pointer to the beginning of the offsets buffer
* @param validity Optional pointer to the beginning of the validities
* buffer
*/
void set_column_data(
std::string_view name,
uint64_t num_elems,
const void* data,
uint32_t* offsets,
uint8_t* validity = nullptr);

/**
* @brief Set the write buffers for an Arrow Table or Batch as represented
* by an ArrowSchema and ArrowArray.
Expand Down Expand Up @@ -759,6 +777,9 @@ class SOMAArray : public SOMAObject {
return enmr;
}

// Helper function for set_column_data
std::shared_ptr<ColumnBuffer> _setup_column_data(std::string_view name);

// Fills the metadata cache upon opening the array.
void fill_metadata_cache();

Expand Down
4 changes: 2 additions & 2 deletions libtiledbsoma/src/utils/arrow_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,8 @@ ArrowAdapter::to_arrow(std::shared_ptr<ColumnBuffer> column) {
schema->flags |= ARROW_FLAG_NULLABLE; // it is also set by default

// Count nulls
for (auto v : column->validity()) {
array->null_count += v == 0;
for (size_t i = 0; i < column->size(); ++i) {
array->null_count += column->validity()[i] == 0;
}

// Convert validity bytemap to a bitmap in place
Expand Down

0 comments on commit b88e1d4

Please sign in to comment.