Skip to content

Commit

Permalink
add decodingPairPool to nimble parallel writer (facebookincubator#101)
Browse files Browse the repository at this point in the history
Summary:

Switching to pool model for parallelism. Instead of a single shared decodedvector and a single selectivityvector, we get a pair of decodedvector and selectivityvector that gets recycled back into a queue

Differential Revision: D65379001
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 12, 2024
1 parent fd7fee7 commit b06d021
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 98 deletions.
117 changes: 36 additions & 81 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@

namespace facebook::nimble {

class FieldWriterContext::LocalDecodedVector {
public:
explicit LocalDecodedVector(FieldWriterContext& context)
: context_(context), vector_(context_.getDecodedVector()) {}

LocalDecodedVector(LocalDecodedVector&& other) noexcept
: context_{other.context_}, vector_{std::move(other.vector_)} {}

LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete;

~LocalDecodedVector() {
if (vector_) {
context_.releaseDecodedVector(std::move(vector_));
}
}

velox::DecodedVector& get() {
return *vector_;
}

private:
FieldWriterContext& context_;
std::unique_ptr<velox::DecodedVector> vector_;
};

namespace {

template <velox::TypeKind KIND>
Expand Down Expand Up @@ -359,8 +334,8 @@ class SimpleFieldWriter : public FieldWriter {
});
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
iterateNonNullValues(
ranges,
Expand All @@ -370,6 +345,7 @@ class SimpleFieldWriter : public FieldWriter {
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
context_.decodingPairPool().addPair(std::move(pair));
}
}

Expand Down Expand Up @@ -421,8 +397,8 @@ class RowFieldWriter : public FieldWriter {
childRangesPtr = &ranges;
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
row = decoded.base()->as<velox::RowVector>();
NIMBLE_ASSERT(row, "Unexpected vector type");
NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch");
Expand All @@ -433,6 +409,7 @@ class RowFieldWriter : public FieldWriter {
nullsStream_.mutableNonNulls(),
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
context_.decodingPairPool().addPair(std::move(pair));
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
Expand Down Expand Up @@ -500,8 +477,8 @@ class MultiValueFieldWriter : public FieldWriter {
iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
casted = decoded.base()->as<T>();
NIMBLE_ASSERT(casted, "Unexpected vector type");
offsets = casted->rawOffsets();
Expand All @@ -510,6 +487,7 @@ class MultiValueFieldWriter : public FieldWriter {
lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc);
context_.decodingPairPool().addPair(std::move(pair));
}

return casted;
Expand Down Expand Up @@ -715,8 +693,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
iterableVector,
processMapIndex);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
mapVector = decoded.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(mapVector, "Unexpected vector type");
rawOffsets = mapVector->rawOffsets();
Expand All @@ -728,6 +706,7 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
offsetsStream_.mutableNonNulls(),
iterableVector,
processMapIndex);
context_.decodingPairPool().addPair(std::move(pair));
}

// Copy the last valid element into the cache.
Expand Down Expand Up @@ -965,13 +944,14 @@ class FlatMapFieldWriter : public FieldWriter {
// Keys are encoded. Decode.
iterateNonNullIndices<false>(
ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges);
auto localDecodedKeys = decode(mapKeys, keyRanges);
auto& decodedKeys = localDecodedKeys.get();
auto pair = decode(mapKeys, keyRanges);
auto& decodedKeys = *pair.first;
Decoded<KeyType> keysVector{decodedKeys};
iterateNonNullIndices<true>(
ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) {
processMap(offset, keysVector);
});
context_.decodingPairPool().addPair(std::move(pair));
}
};

Expand All @@ -990,15 +970,16 @@ class FlatMapFieldWriter : public FieldWriter {
processVector(map, Flat{vector});
} else {
// Map is encoded. Decode.
auto localDecodedMap = decode(vector, ranges);
auto& decodedMap = localDecodedMap.get();
auto pair = decode(vector, ranges);
auto& decodedMap = *pair.first;
map = decodedMap.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(map, "Unexpected vector type");
offsets = map->rawOffsets();
lengths = map->rawSizes();

nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size);
processVector(map, Decoded{decodedMap});
context_.decodingPairPool().addPair(std::move(pair));
}

// Now actually ingest the map values
Expand Down Expand Up @@ -1375,8 +1356,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {

iterateNonNullIndices<false>(ranges, nonNulls, iterableVector, dedupProc);
} else {
auto localDecoded = decode(vectorElements, childRanges);
auto& decoded = localDecoded.get();
auto pair = decode(vectorElements, childRanges);
auto& decoded = *pair.first;
/** compare array at index and prevIndex to be equal */
compareConsecutive = [&](velox::vector_size_t index,
velox::vector_size_t prevIndex) {
Expand Down Expand Up @@ -1407,6 +1388,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
return match;
};
iterateNonNullIndices<false>(ranges, nonNulls, iterableVector, dedupProc);
context_.decodingPairPool().addPair(std::move(pair));
}

// Copy the last valid element into the cache.
Expand Down Expand Up @@ -1455,8 +1437,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
ingestLengthsOffsetsByElements(
arrayVector, iterableVector, ranges, childRanges, filteredRanges);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
arrayVector = decoded.base()->template as<velox::ArrayVector>();
NIMBLE_ASSERT(arrayVector, "Unexpected vector type");
rawOffsets = arrayVector->rawOffsets();
Expand All @@ -1468,6 +1450,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);
ingestLengthsOffsetsByElements(
arrayVector, iterableVector, ranges, childRanges, filteredRanges);
context_.decodingPairPool().addPair(std::move(pair));
}
return arrayVector;
}
Expand Down Expand Up @@ -1510,51 +1493,23 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

} // namespace

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
}

velox::SelectivityVector& FieldWriterContext::getSelectivityVector(
velox::vector_size_t size) {
if (LIKELY(selectivity_.get() != nullptr)) {
selectivity_->resize(size);
} else {
selectivity_ = std::make_unique<velox::SelectivityVector>(size);
}
return *selectivity_;
}

std::unique_ptr<velox::DecodedVector> FieldWriterContext::getDecodedVector() {
if (decodedVectorPool_.empty()) {
return std::make_unique<velox::DecodedVector>();
}
auto vector = std::move(decodedVectorPool_.back());
decodedVectorPool_.pop_back();
return vector;
}

void FieldWriterContext::releaseDecodedVector(
std::unique_ptr<velox::DecodedVector>&& vector) {
decodedVectorPool_.push_back(std::move(vector));
}

FieldWriterContext::LocalDecodedVector FieldWriter::decode(
DecodingPair FieldWriter::decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
auto& selectivityVector = context_.getSelectivityVector(vector->size());
// initialize selectivity vector
selectivityVector.clearAll();
auto pair = context_.decodingPairPool().reservePair();
// decodedVector = pair.first
// selectivityVector = pair.second

pair.second->resize(vector->size());
pair.second->clearAll();
ranges.apply([&](auto offset, auto size) {
selectivityVector.setValidRange(offset, offset + size, true);
pair.second->setValidRange(offset, offset + size, true);
});
selectivityVector.updateBounds();
pair.second->updateBounds();

auto localDecoded = context_.getLocalDecodedVector();
localDecoded.get().decode(*vector, selectivityVector);
return localDecoded;
pair.first->decode(*vector, *pair.second);
// we return the pair so that the caller can return it to the pool
return pair;
}

std::unique_ptr<FieldWriter> FieldWriter::create(
Expand Down
Loading

0 comments on commit b06d021

Please sign in to comment.