Skip to content

Commit

Permalink
add decodingPairPool to nimble parallel writer (#101)
Browse files Browse the repository at this point in the history
Summary:

Switching to pool model for parallelism. Instead of a single shared decodedvector and a single selectivityvector, we get a pair of decodedvector and selectivityvector that gets recycled back into a queue

Differential Revision: D65379001
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 7, 2024
1 parent 60ffa3b commit 31866a6
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 98 deletions.
117 changes: 36 additions & 81 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@

namespace facebook::nimble {

class FieldWriterContext::LocalDecodedVector {
public:
explicit LocalDecodedVector(FieldWriterContext& context)
: context_(context), vector_(context_.getDecodedVector()) {}

LocalDecodedVector(LocalDecodedVector&& other) noexcept
: context_{other.context_}, vector_{std::move(other.vector_)} {}

LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete;

~LocalDecodedVector() {
if (vector_) {
context_.releaseDecodedVector(std::move(vector_));
}
}

velox::DecodedVector& get() {
return *vector_;
}

private:
FieldWriterContext& context_;
std::unique_ptr<velox::DecodedVector> vector_;
};

namespace {

template <velox::TypeKind KIND>
Expand Down Expand Up @@ -359,8 +334,8 @@ class SimpleFieldWriter : public FieldWriter {
});
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
iterateNonNullValues(
ranges,
Expand All @@ -370,6 +345,7 @@ class SimpleFieldWriter : public FieldWriter {
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
context_.decodingPairPool().addPair(std::move(pair));
}
}

Expand Down Expand Up @@ -421,8 +397,8 @@ class RowFieldWriter : public FieldWriter {
childRangesPtr = &ranges;
}
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
row = decoded.base()->as<velox::RowVector>();
NIMBLE_ASSERT(row, "Unexpected vector type");
NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch");
Expand All @@ -433,6 +409,7 @@ class RowFieldWriter : public FieldWriter {
nullsStream_.mutableNonNulls(),
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
context_.decodingPairPool().addPair(std::move(pair));
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
Expand Down Expand Up @@ -500,8 +477,8 @@ class MultiValueFieldWriter : public FieldWriter {
iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
casted = decoded.base()->as<T>();
NIMBLE_ASSERT(casted, "Unexpected vector type");
offsets = casted->rawOffsets();
Expand All @@ -510,6 +487,7 @@ class MultiValueFieldWriter : public FieldWriter {
lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc);
context_.decodingPairPool().addPair(std::move(pair));
}

return casted;
Expand Down Expand Up @@ -715,8 +693,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
iterableVector,
processMapIndex);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
mapVector = decoded.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(mapVector, "Unexpected vector type");
rawOffsets = mapVector->rawOffsets();
Expand All @@ -728,6 +706,7 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
offsetsStream_.mutableNonNulls(),
iterableVector,
processMapIndex);
context_.decodingPairPool().addPair(std::move(pair));
}

// Copy the last valid element into the cache.
Expand Down Expand Up @@ -965,13 +944,14 @@ class FlatMapFieldWriter : public FieldWriter {
// Keys are encoded. Decode.
iterateNonNullIndices<false>(
ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges);
auto localDecodedKeys = decode(mapKeys, keyRanges);
auto& decodedKeys = localDecodedKeys.get();
auto pair = decode(mapKeys, keyRanges);
auto& decodedKeys = *pair.first;
Decoded<KeyType> keysVector{decodedKeys};
iterateNonNullIndices<true>(
ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) {
processMap(offset, keysVector);
});
context_.decodingPairPool().addPair(std::move(pair));
}
};

Expand All @@ -990,15 +970,16 @@ class FlatMapFieldWriter : public FieldWriter {
processVector(map, Flat{vector});
} else {
// Map is encoded. Decode.
auto localDecodedMap = decode(vector, ranges);
auto& decodedMap = localDecodedMap.get();
auto pair = decode(vector, ranges);
auto& decodedMap = *pair.first;
map = decodedMap.base()->template as<velox::MapVector>();
NIMBLE_ASSERT(map, "Unexpected vector type");
offsets = map->rawOffsets();
lengths = map->rawSizes();

nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size);
processVector(map, Decoded{decodedMap});
context_.decodingPairPool().addPair(std::move(pair));
}

// Now actually ingest the map values
Expand Down Expand Up @@ -1375,8 +1356,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {

iterateNonNullIndices<false>(ranges, nonNulls, iterableVector, dedupProc);
} else {
auto localDecoded = decode(vectorElements, childRanges);
auto& decoded = localDecoded.get();
auto pair = decode(vectorElements, childRanges);
auto& decoded = *pair.first;
/** compare array at index and prevIndex to be equal */
compareConsecutive = [&](velox::vector_size_t index,
velox::vector_size_t prevIndex) {
Expand Down Expand Up @@ -1407,6 +1388,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
return match;
};
iterateNonNullIndices<false>(ranges, nonNulls, iterableVector, dedupProc);
context_.decodingPairPool().addPair(std::move(pair));
}

// Copy the last valid element into the cache.
Expand Down Expand Up @@ -1455,8 +1437,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
ingestLengthsOffsetsByElements(
arrayVector, iterableVector, ranges, childRanges, filteredRanges);
} else {
auto localDecoded = decode(vector, ranges);
auto& decoded = localDecoded.get();
auto pair = decode(vector, ranges);
auto& decoded = *pair.first;
arrayVector = decoded.base()->template as<velox::ArrayVector>();
NIMBLE_ASSERT(arrayVector, "Unexpected vector type");
rawOffsets = arrayVector->rawOffsets();
Expand All @@ -1468,6 +1450,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);
ingestLengthsOffsetsByElements(
arrayVector, iterableVector, ranges, childRanges, filteredRanges);
context_.decodingPairPool().addPair(std::move(pair));
}
return arrayVector;
}
Expand Down Expand Up @@ -1510,51 +1493,23 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

} // namespace

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing");
vectorDecoderVisitor();
return LocalDecodedVector{*this};
}

velox::SelectivityVector& FieldWriterContext::getSelectivityVector(
velox::vector_size_t size) {
if (LIKELY(selectivity_.get() != nullptr)) {
selectivity_->resize(size);
} else {
selectivity_ = std::make_unique<velox::SelectivityVector>(size);
}
return *selectivity_;
}

std::unique_ptr<velox::DecodedVector> FieldWriterContext::getDecodedVector() {
if (decodedVectorPool_.empty()) {
return std::make_unique<velox::DecodedVector>();
}
auto vector = std::move(decodedVectorPool_.back());
decodedVectorPool_.pop_back();
return vector;
}

void FieldWriterContext::releaseDecodedVector(
std::unique_ptr<velox::DecodedVector>&& vector) {
decodedVectorPool_.push_back(std::move(vector));
}

FieldWriterContext::LocalDecodedVector FieldWriter::decode(
DecodingPair FieldWriter::decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges) {
auto& selectivityVector = context_.getSelectivityVector(vector->size());
// initialize selectivity vector
selectivityVector.clearAll();
auto pair = context_.decodingPairPool().reservePair();
// decodedVector = pair.first
// selectivityVector = pair.second

pair.second->resize(vector->size());
pair.second->clearAll();
ranges.apply([&](auto offset, auto size) {
selectivityVector.setValidRange(offset, offset + size, true);
pair.second->setValidRange(offset, offset + size, true);
});
selectivityVector.updateBounds();
pair.second->updateBounds();

auto localDecoded = context_.getLocalDecodedVector();
localDecoded.get().decode(*vector, selectivityVector);
return localDecoded;
pair.first->decode(*vector, *pair.second);
// we return the pair so that the caller can return it to the pool
return pair;
}

std::unique_ptr<FieldWriter> FieldWriter::create(
Expand Down
108 changes: 94 additions & 14 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "dwio/nimble/velox/OrderedRanges.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "folly/concurrency/DynamicBoundedQueue.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

Expand All @@ -34,20 +35,104 @@ struct InputBufferGrowthStats {
std::atomic<uint64_t> itemCount{0};
};

struct FieldWriterContext {
class LocalDecodedVector;
using DecodedVectorPtr = std::unique_ptr<velox::DecodedVector>;
using SelectivityVectorPtr = std::unique_ptr<velox::SelectivityVector>;
using DecodingPair = std::pair<DecodedVectorPtr, SelectivityVectorPtr>;

// A pool of decoding pairs. Decoding pairs are used to decode a vector
// and its associated selectivity vector. The pool is used to avoid
// repeated allocations of decoding pairs. DMPMCQueue needs to be set to
// 90% of expected capacity for best performance.
class DecodingPairPool {
public:
explicit DecodingPairPool(
std::function<void(void)> vectorDecoderVisitor = []() {},
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
size_t maxPoolSize = std::thread::hardware_concurrency())
: timeout_{timeout},
pool_{maxPoolSize},
vectorDecoderVisitor_{std::move(vectorDecoderVisitor)} {
NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0");
NIMBLE_CHECK(maxPoolSize > 0, "maxPoolSize must be > 0");
NIMBLE_CHECK(vectorDecoderVisitor_, "vectorDecoderVisitor must be set");
}
~DecodingPairPool() {
clearPool();
}

void addPair(DecodingPair pair) {
auto status = pool_.try_enqueue_for(std::move(pair), timeout_);
if (!status) {
NIMBLE_UNKNOWN(
"Timeout enqueuing decodingPair timeout=" +
std::to_string(timeout_.count()) + " ms");
}
}

void addPair(DecodedVectorPtr decoded, SelectivityVectorPtr selectivity) {
addPair(std::make_pair(std::move(decoded), std::move(selectivity)));
}

// Get a decoding pair from the pool, or create a new one if the pool is
// empty.
DecodingPair reservePair() {
vectorDecoderVisitor_();

if (pool_.empty()) {
return newPair();
}
DecodingPair pair;
auto status = pool_.try_dequeue_for(pair, timeout_);
if (!status) {
NIMBLE_UNREACHABLE(
"Timeout dequeuing decodingPair timeout=" +
std::to_string(timeout_.count()) + " ms");
}
return pair;
}

size_t size() const {
return pool_.size();
}

void clearPool() {
while (!pool_.empty()) {
DecodingPair pair;
pool_.dequeue(pair);
pair.first.reset();
pair.second.reset();
}
}

private:
const std::chrono::milliseconds timeout_;
folly::DMPMCQueue<DecodingPair, true> pool_;
std::function<void(void)> vectorDecoderVisitor_;

DecodingPair newPair() {
auto decoded = std::make_unique<velox::DecodedVector>();
auto selectivity = std::make_unique<velox::SelectivityVector>();
return std::make_pair(std::move(decoded), std::move(selectivity));
}
};

struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
std::function<void(void)> vectorDecoderVisitor = []() {},
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
size_t maxPoolSize = std::thread::hardware_concurrency())
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
vectorDecoderVisitor(std::move(vectorDecoderVisitor)) {
decodingPairPool_{std::make_unique<DecodingPairPool>(
std::move(vectorDecoderVisitor),
timeout,
maxPoolSize)} {
resetStringBuffer();
}

Expand All @@ -67,10 +152,9 @@ struct FieldWriterContext {
std::function<void(const TypeBuilder&)> typeAddedHandler =
[](const TypeBuilder&) {};

std::function<void(void)> vectorDecoderVisitor;

LocalDecodedVector getLocalDecodedVector();
velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size);
DecodingPairPool& decodingPairPool() {
return *decodingPairPool_;
}

Buffer& stringBuffer() {
return *buffer_;
Expand Down Expand Up @@ -108,12 +192,8 @@ struct FieldWriterContext {
}

private:
std::unique_ptr<velox::DecodedVector> getDecodedVector();
void releaseDecodedVector(std::unique_ptr<velox::DecodedVector>&& vector);

std::unique_ptr<Buffer> buffer_;
std::vector<std::unique_ptr<velox::DecodedVector>> decodedVectorPool_;
std::unique_ptr<velox::SelectivityVector> selectivity_;
std::unique_ptr<DecodingPairPool> decodingPairPool_;
std::vector<std::unique_ptr<StreamData>> streams_;
};

Expand Down Expand Up @@ -157,7 +237,7 @@ class FieldWriter {
FieldWriterContext& context_;
std::shared_ptr<TypeBuilder> typeBuilder_;

FieldWriterContext::LocalDecodedVector decode(
DecodingPair decode(
const velox::VectorPtr& vector,
const OrderedRanges& ranges);
};
Expand Down
Loading

0 comments on commit 31866a6

Please sign in to comment.