Skip to content

Commit

Permalink
[ntuple] add bulk reading with adopted buffer
Browse files Browse the repository at this point in the history
Includes a fix in RRVecField::BulkRead for adopted buffer handling.
  • Loading branch information
jblomer committed Feb 13, 2024
1 parent 31a7e29 commit c30c403
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
5 changes: 5 additions & 0 deletions tree/ntuple/v7/inc/ROOT/RField.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public:
std::size_t fValueSize = 0; ///< Cached copy of fField->GetValueSize()
std::size_t fCapacity = 0; ///< The size of the array memory block in number of values
std::size_t fSize = 0; ///< The number of available values in the array (provided their mask is set)
bool fIsAdopted = false; ///< True if the user provides the memory buffer for fValues
std::unique_ptr<bool[]> fMaskAvail; ///< Masks invalid values in the array
std::size_t fNValidValues = 0; ///< The sum of non-zero elements in the fMask
RClusterIndex fFirstIndex; ///< Index of the first value of the array
Expand Down Expand Up @@ -271,6 +272,10 @@ public:
RBulk(RBulk &&other);
RBulk &operator=(RBulk &&other);

// Sets fValues and fSize/fCapacity to the given values. The capacity is specified in number of values.
// Once a buffer is adopted, an attempt to read more values then available throws an exception.
void AdoptBuffer(void *buf, std::size_t capacity);

/// Reads 'size' values from the associated field, starting from 'firstIndex'. Note that the index is given
/// relative to a certain cluster. The return value points to the array of read objects.
/// The 'maskReq' parameter is a bool array of at least 'size' elements. Only objects for which the mask is
Expand Down
30 changes: 27 additions & 3 deletions tree/ntuple/v7/src/RField.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ ROOT::Experimental::RFieldBase::RBulk::RBulk(RBulk &&other)
fValueSize(other.fValueSize),
fCapacity(other.fCapacity),
fSize(other.fSize),
fIsAdopted(other.fIsAdopted),
fNValidValues(other.fNValidValues),
fFirstIndex(other.fFirstIndex)
{
Expand All @@ -359,6 +360,7 @@ ROOT::Experimental::RFieldBase::RBulk &ROOT::Experimental::RFieldBase::RBulk::op
std::swap(fValueSize, other.fValueSize);
std::swap(fCapacity, other.fCapacity);
std::swap(fSize, other.fSize);
std::swap(fIsAdopted, other.fIsAdopted);
std::swap(fMaskAvail, other.fMaskAvail);
std::swap(fNValidValues, other.fNValidValues);
std::swap(fFirstIndex, other.fFirstIndex);
Expand All @@ -373,6 +375,9 @@ ROOT::Experimental::RFieldBase::RBulk::~RBulk()

void ROOT::Experimental::RFieldBase::RBulk::ReleaseValues()
{
if (fIsAdopted)
return;

if (fField->GetTraits() & RFieldBase::kTraitTriviallyDestructible) {
free(fValues);
return;
Expand All @@ -387,6 +392,9 @@ void ROOT::Experimental::RFieldBase::RBulk::ReleaseValues()
void ROOT::Experimental::RFieldBase::RBulk::Reset(RClusterIndex firstIndex, std::size_t size)
{
if (fCapacity < size) {
if (fIsAdopted) {
throw RException(R__FAIL("invalid attempt to bulk read beyond the adopted buffer"));
}
ReleaseValues();
fValues = malloc(size * fValueSize);

Expand Down Expand Up @@ -414,6 +422,20 @@ void ROOT::Experimental::RFieldBase::RBulk::CountValidValues()
fNValidValues += static_cast<std::size_t>(fMaskAvail[i]);
}

void ROOT::Experimental::RFieldBase::RBulk::AdoptBuffer(void *buf, std::size_t capacity)
{
ReleaseValues();
fValues = buf;
fCapacity = capacity;
fSize = capacity;

fMaskAvail = std::make_unique<bool[]>(capacity);

fFirstIndex = RClusterIndex();

fIsAdopted = true;
}

//------------------------------------------------------------------------------

ROOT::Experimental::RFieldBase::RFieldBase(std::string_view name, std::string_view type, ENTupleStructure structure,
Expand Down Expand Up @@ -2246,7 +2268,7 @@ std::size_t ROOT::Experimental::RRVecField::ReadBulkImpl(const RBulkSpec &bulkSp
}
const auto itemValueSize = *reinterpret_cast<std::size_t *>(bulkSpec.fAuxData->data());
unsigned char *itemValueArray = bulkSpec.fAuxData->data() + sizeof(std::size_t);
auto [beginPtr, sizePtr, _] = GetRVecDataMembers(bulkSpec.fValues);
auto [beginPtr, sizePtr, capacityPtr] = GetRVecDataMembers(bulkSpec.fValues);

// Get size of the first RVec of the bulk
RClusterIndex firstItemIndex;
Expand All @@ -2255,6 +2277,7 @@ std::size_t ROOT::Experimental::RRVecField::ReadBulkImpl(const RBulkSpec &bulkSp
this->GetCollectionInfo(bulkSpec.fFirstIndex, &firstItemIndex, &collectionSize);
*beginPtr = itemValueArray;
*sizePtr = collectionSize;
*capacityPtr = -1;

// Set the size of the remaining RVecs of the bulk, going page by page through the RNTuple offset column.
// We optimistically assume that bulkSpec.fAuxData is already large enough to hold all the item values in the
Expand All @@ -2269,10 +2292,11 @@ std::size_t ROOT::Experimental::RRVecField::ReadBulkImpl(const RBulkSpec &bulkSp
const std::size_t nBatch = std::min(nRemainingValues, nElementsUntilPageEnd);
for (std::size_t i = 0; i < nBatch; ++i) {
const auto size = offsets[i] - lastOffset;
std::tie(beginPtr, sizePtr, _) = GetRVecDataMembers(
reinterpret_cast<unsigned char *>(bulkSpec.fValues) + (nValues + i) * fValueSize);
std::tie(beginPtr, sizePtr, capacityPtr) =
GetRVecDataMembers(reinterpret_cast<unsigned char *>(bulkSpec.fValues) + (nValues + i) * fValueSize);
*beginPtr = itemValueArray + nItems * itemValueSize;
*sizePtr = size;
*capacityPtr = -1;

nItems += size;
lastOffset = offsets[i];
Expand Down
52 changes: 52 additions & 0 deletions tree/ntuple/v7/test/ntuple_bulk.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,55 @@ TEST(RNTupleBulk, RVec)
}
}
}

TEST(RNTupleBulk, Adopted)
{
FileRaii fileGuard("test_ntuple_bulk_adopted.root");
{
auto model = RNTupleModel::Create();
auto fldVecI = model->MakeField<ROOT::RVecI>("vint");
auto writer = RNTupleWriter::Recreate(std::move(model), "ntpl", fileGuard.GetPath());
for (int i = 0; i < 10; ++i) {
fldVecI->resize(i);
for (int j = 0; j < i; ++j) {
fldVecI->at(j) = j;
}
writer->Fill();
}
}

auto reader = RNTupleReader::Open("ntpl", fileGuard.GetPath());
RFieldBase::RBulk bulkI = reader->GetModel().CreateBulk("vint");

auto mask = std::make_unique<bool[]>(10);
std::fill(mask.get(), mask.get() + 10, true);

auto iArr = static_cast<ROOT::RVecI *>(bulkI.ReadBulk(RClusterIndex(0, 0), mask.get(), 10));
for (int i = 0; i < 10; ++i) {
EXPECT_EQ(i, iArr[i].size());
for (std::size_t j = 0; j < iArr[i].size(); ++j) {
EXPECT_EQ(j, iArr[i].at(j));
}
}

auto buf1 = std::make_unique<ROOT::RVecI[]>(10);
bulkI.AdoptBuffer(buf1.get(), 10);
bulkI.ReadBulk(RClusterIndex(0, 0), mask.get(), 10);
for (int i = 0; i < 10; ++i) {
EXPECT_EQ(i, buf1[i].size());
for (std::size_t j = 0; j < buf1[i].size(); ++j) {
EXPECT_EQ(j, buf1[i].at(j));
}
}

auto buf2 = std::make_unique<ROOT::RVecI[]>(10);
bulkI.AdoptBuffer(buf2.get(), 5);
EXPECT_THROW(bulkI.ReadBulk(RClusterIndex(0, 0), mask.get(), 10), RException);
bulkI.ReadBulk(RClusterIndex(0, 0), mask.get(), 5);
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(i, buf2[i].size());
for (std::size_t j = 0; j < buf2[i].size(); ++j) {
EXPECT_EQ(j, buf2[i].at(j));
}
}
}

0 comments on commit c30c403

Please sign in to comment.