From c81d29233af2c5f9b67b233b49577c91d6fad852 Mon Sep 17 00:00:00 2001 From: Scott Young Date: Wed, 6 Nov 2024 01:22:17 -0800 Subject: [PATCH] add decodingPairPool to nimble parallel writer (#101) Summary: Switching to pool model for parallelism. Instead of a single shared decodedvector and a single selectivityvector, we get a pair of decodedvector and selectivityvector that gets recycled back into a queue Differential Revision: D65379001 --- dwio/nimble/velox/FieldWriter.cpp | 115 ++++------- dwio/nimble/velox/FieldWriter.h | 98 ++++++++-- dwio/nimble/velox/VeloxWriter.cpp | 2 +- dwio/nimble/velox/VeloxWriterOptions.h | 4 + .../velox/tests/DecodingPairPoolTests.cpp | 183 ++++++++++++++++++ dwio/nimble/velox/tests/VeloxReaderTests.cpp | 6 +- 6 files changed, 314 insertions(+), 94 deletions(-) create mode 100644 dwio/nimble/velox/tests/DecodingPairPoolTests.cpp diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 5eb2ffb..927cfa9 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -25,31 +25,6 @@ namespace facebook::nimble { -class FieldWriterContext::LocalDecodedVector { - public: - explicit LocalDecodedVector(FieldWriterContext& context) - : context_(context), vector_(context_.getDecodedVector()) {} - - LocalDecodedVector(LocalDecodedVector&& other) noexcept - : context_{other.context_}, vector_{std::move(other.vector_)} {} - - LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete; - - ~LocalDecodedVector() { - if (vector_) { - context_.releaseDecodedVector(std::move(vector_)); - } - } - - velox::DecodedVector& get() { - return *vector_; - } - - private: - FieldWriterContext& context_; - std::unique_ptr vector_; -}; - namespace { template @@ -357,8 +332,8 @@ class SimpleFieldWriter : public FieldWriter { }); } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); iterateNonNullValues( ranges, @@ -368,6 +343,7 @@ class SimpleFieldWriter : public FieldWriter { data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); }); + context_.decodingPairPool().addPair(std::move(pair)); } } @@ -419,8 +395,8 @@ class RowFieldWriter : public FieldWriter { childRangesPtr = &ranges; } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; row = decoded.base()->as(); NIMBLE_ASSERT(row, "Unexpected vector type"); NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch"); @@ -431,6 +407,7 @@ class RowFieldWriter : public FieldWriter { nullsStream_.mutableNonNulls(), Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); + context_.decodingPairPool().addPair(std::move(pair)); } for (auto i = 0; i < fields_.size(); ++i) { fields_[i]->write(row->childAt(i), *childRangesPtr); @@ -498,8 +475,8 @@ class MultiValueFieldWriter : public FieldWriter { iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; casted = decoded.base()->as(); NIMBLE_ASSERT(casted, "Unexpected vector type"); offsets = casted->rawOffsets(); @@ -508,6 +485,7 @@ class MultiValueFieldWriter : public FieldWriter { lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc); + context_.decodingPairPool().addPair(std::move(pair)); } return casted; @@ -713,8 +691,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter { iterableVector, processMapIndex); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; mapVector = decoded.base()->template as(); NIMBLE_ASSERT(mapVector, "Unexpected vector type"); rawOffsets = mapVector->rawOffsets(); @@ -726,6 +704,7 @@ class SlidingWindowMapFieldWriter : public FieldWriter { offsetsStream_.mutableNonNulls(), iterableVector, processMapIndex); + context_.decodingPairPool().addPair(std::move(pair)); } // Copy the last valid element into the cache. @@ -859,13 +838,14 @@ class FlatMapFieldWriter : public FieldWriter { // Keys are encoded. Decode. iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges); - auto localDecodedKeys = decode(mapKeys, keyRanges); - auto& decodedKeys = localDecodedKeys.get(); + auto pair = decode(mapKeys, keyRanges); + auto& decodedKeys = *pair.first; Decoded keysVector{decodedKeys}; iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) { processMap(offset, keysVector); }); + context_.decodingPairPool().addPair(std::move(pair)); } }; @@ -884,8 +864,8 @@ class FlatMapFieldWriter : public FieldWriter { processVector(map, Flat{vector}); } else { // Map is encoded. Decode. - auto localDecodedMap = decode(vector, ranges); - auto& decodedMap = localDecodedMap.get(); + auto pair = decode(vector, ranges); + auto& decodedMap = *pair.first; map = decodedMap.base()->template as(); NIMBLE_ASSERT(map, "Unexpected vector type"); offsets = map->rawOffsets(); @@ -893,6 +873,7 @@ class FlatMapFieldWriter : public FieldWriter { nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size); processVector(map, Decoded{decodedMap}); + context_.decodingPairPool().addPair(std::move(pair)); } // Now actually ingest the map values @@ -1143,8 +1124,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { iterateNonNullIndices(ranges, nonNulls, iterableVector, dedupProc); } else { - auto localDecoded = decode(vectorElements, childRanges); - auto& decoded = localDecoded.get(); + auto pair = decode(vectorElements, childRanges); + auto& decoded = *pair.first; /** compare array at index and prevIndex to be equal */ compareConsecutive = [&](velox::vector_size_t index, velox::vector_size_t prevIndex) { @@ -1175,6 +1156,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { return match; }; iterateNonNullIndices(ranges, nonNulls, iterableVector, dedupProc); + context_.decodingPairPool().addPair(std::move(pair)); } // Copy the last valid element into the cache. @@ -1223,8 +1205,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { ingestLengthsOffsetsByElements( arrayVector, iterableVector, ranges, childRanges, filteredRanges); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; arrayVector = decoded.base()->template as(); NIMBLE_ASSERT(arrayVector, "Unexpected vector type"); rawOffsets = arrayVector->rawOffsets(); @@ -1236,6 +1218,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc); ingestLengthsOffsetsByElements( arrayVector, iterableVector, ranges, childRanges, filteredRanges); + context_.decodingPairPool().addPair(std::move(pair)); } return arrayVector; } @@ -1278,49 +1261,23 @@ std::unique_ptr createArrayWithOffsetsFieldWriter( } // namespace -FieldWriterContext::LocalDecodedVector -FieldWriterContext::getLocalDecodedVector() { - return LocalDecodedVector{*this}; -} - -velox::SelectivityVector& FieldWriterContext::getSelectivityVector( - velox::vector_size_t size) { - if (LIKELY(selectivity_.get() != nullptr)) { - selectivity_->resize(size); - } else { - selectivity_ = std::make_unique(size); - } - return *selectivity_; -} - -std::unique_ptr FieldWriterContext::getDecodedVector() { - if (decodedVectorPool_.empty()) { - return std::make_unique(); - } - auto vector = std::move(decodedVectorPool_.back()); - decodedVectorPool_.pop_back(); - return vector; -} - -void FieldWriterContext::releaseDecodedVector( - std::unique_ptr&& vector) { - decodedVectorPool_.push_back(std::move(vector)); -} - -FieldWriterContext::LocalDecodedVector FieldWriter::decode( +DecodingPair FieldWriter::decode( const velox::VectorPtr& vector, const OrderedRanges& ranges) { - auto& selectivityVector = context_.getSelectivityVector(vector->size()); - // initialize selectivity vector - selectivityVector.clearAll(); + auto pair = context_.decodingPairPool().reservePair(); + // decodedVector = pair.first + // selectivityVector = pair.second + + pair.second->resize(vector->size()); + pair.second->clearAll(); ranges.apply([&](auto offset, auto size) { - selectivityVector.setValidRange(offset, offset + size, true); + pair.second->setValidRange(offset, offset + size, true); }); - selectivityVector.updateBounds(); + pair.second->updateBounds(); - auto localDecoded = context_.getLocalDecodedVector(); - localDecoded.get().decode(*vector, selectivityVector); - return localDecoded; + pair.first->decode(*vector, *pair.second); + // we return the pair so that the caller can return it to the pool + return pair; } std::unique_ptr FieldWriter::create( diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 0efa941..36e86f7 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -22,6 +22,7 @@ #include "dwio/nimble/velox/OrderedRanges.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" +#include "folly/concurrency/DynamicBoundedQueue.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -34,18 +35,94 @@ struct InputBufferGrowthStats { std::atomic itemCount{0}; }; -struct FieldWriterContext { - class LocalDecodedVector; +using DecodedVectorPtr = std::unique_ptr; +using SelectivityVectorPtr = std::unique_ptr; +using DecodingPair = std::pair; + +// A pool of decoding pairs. Decoding pairs are used to decode a vector +// and its associated selectivity vector. The pool is used to avoid +// repeated allocations of decoding pairs. DMPMCQueue needs to be set to +// 90% of expected capacity for best performance. +class DecodingPairPool { + public: + explicit DecodingPairPool( + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), + size_t maxPoolSize = std::thread::hardware_concurrency()) + : timeout_{timeout}, pool_{maxPoolSize} { + NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0"); + NIMBLE_CHECK(maxPoolSize > 0, "maxPoolSize must be > 0") + } + ~DecodingPairPool() { + clearPool(); + } + + void addPair(DecodingPair pair) { + auto status = pool_.try_enqueue_for(std::move(pair), timeout_); + if (!status) { + NIMBLE_UNKNOWN( + "Timeout enqueuing decodingPair timeout=" + + std::to_string(timeout_.count()) + " ms"); + } + } + + void addPair(DecodedVectorPtr decoded, SelectivityVectorPtr selectivity) { + addPair(std::make_pair(std::move(decoded), std::move(selectivity))); + } + // Get a decoding pair from the pool, or create a new one if the pool is + // empty. + DecodingPair reservePair() { + if (pool_.empty()) { + return newPair(); + } + DecodingPair pair; + auto status = pool_.try_dequeue_for(pair, timeout_); + if (!status) { + NIMBLE_UNREACHABLE( + "Timeout dequeuing decodingPair timeout=" + + std::to_string(timeout_.count()) + " ms"); + } + return pair; + } + + size_t size() const { + return pool_.size(); + } + + void clearPool() { + while (!pool_.empty()) { + DecodingPair pair; + pool_.dequeue(pair); + pair.first.reset(); + pair.second.reset(); + } + } + + private: + const std::chrono::milliseconds timeout_; + folly::DMPMCQueue pool_; + + DecodingPair newPair() { + auto decoded = std::make_unique(); + auto selectivity = std::make_unique(); + return std::make_pair(std::move(decoded), std::move(selectivity)); + } +}; + +struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, - std::unique_ptr reclaimer = nullptr) + std::unique_ptr reclaimer = nullptr, + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), + size_t maxPoolSize = std::thread::hardware_concurrency()) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, inputBufferGrowthPolicy{ - DefaultInputBufferGrowthPolicy::withDefaultRanges()} { + DefaultInputBufferGrowthPolicy::withDefaultRanges()}, + decodingPairPool_{ + std::make_unique(timeout, maxPoolSize)} { resetStringBuffer(); } @@ -65,8 +142,9 @@ struct FieldWriterContext { std::function typeAddedHandler = [](const TypeBuilder&) {}; - LocalDecodedVector getLocalDecodedVector(); - velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size); + DecodingPairPool& decodingPairPool() { + return *decodingPairPool_; + } Buffer& stringBuffer() { return *buffer_; @@ -104,12 +182,8 @@ struct FieldWriterContext { } private: - std::unique_ptr getDecodedVector(); - void releaseDecodedVector(std::unique_ptr&& vector); - std::unique_ptr buffer_; - std::vector> decodedVectorPool_; - std::unique_ptr selectivity_; + std::unique_ptr decodingPairPool_; std::vector> streams_; }; @@ -153,7 +227,7 @@ class FieldWriter { FieldWriterContext& context_; std::shared_ptr typeBuilder_; - FieldWriterContext::LocalDecodedVector decode( + DecodingPair decode( const velox::VectorPtr& vector, const OrderedRanges& ranges); }; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 9980969..fdff29b 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory()}, + : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.poolTimeout, options.maxPoolSize}, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 4689b1d..99cfd5f 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -126,6 +126,10 @@ struct VeloxWriterOptions { const velox::common::SpillConfig* spillConfig{nullptr}; + const std::chrono::milliseconds poolTimeout = + std::chrono::milliseconds{1000 * 10}; + const size_t maxPoolSize = std::thread::hardware_concurrency(); + // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; diff --git a/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp b/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp new file mode 100644 index 0000000..470eac4 --- /dev/null +++ b/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp @@ -0,0 +1,183 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/velox/FieldWriter.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/dwio/common/ExecutorBarrier.h" +#include "velox/vector/DecodedVector.h" + +namespace facebook::nimble::test { +using MemoryPool = velox::memory::MemoryPool; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; +using DecodedVector = velox::DecodedVector; +using SelectivityVector = velox::SelectivityVector; + +class DecodingPairPoolTest : public ::testing::Test { + public: + DecodingPair newPair() { + auto decoded = std::make_unique(); + auto selectivity = std::make_unique(); + return std::make_pair(std::move(decoded), std::move(selectivity)); + } + + protected: + static void SetUpTestCase() {} + + void SetUp() override {} +}; + +TEST_F(DecodingPairPoolTest, CreatePool) { + auto pairPool = DecodingPairPool{}; + EXPECT_NO_THROW(pairPool); +} + +TEST_F(DecodingPairPoolTest, CreatePoolBadTimeout) { + auto throwLambda = [&]() { + DecodingPairPool{std::chrono::milliseconds(0), 10}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "timeout must be > 0"); + } +} + +TEST_F(DecodingPairPoolTest, CreatePoolBadMaxPool) { + auto throwLambda = [&]() { + DecodingPairPool{std::chrono::milliseconds(1000), 0}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "maxPoolSize must be > 0"); + } +} + +TEST_F(DecodingPairPoolTest, DestroyPool) { + auto pairPoolPtr = std::make_unique(); + pairPoolPtr->reservePair(); + pairPoolPtr.reset(); + EXPECT_EQ(pairPoolPtr, nullptr); +} + +TEST_F(DecodingPairPoolTest, EmptyDecodingPairPool) { + auto pairPool = DecodingPairPool{}; + EXPECT_EQ(pairPool.size(), 0); +} + +TEST_F(DecodingPairPoolTest, ReservePair) { + auto pairPool = DecodingPairPool{}; + auto pair = pairPool.reservePair(); + EXPECT_NO_THROW(pair); +} + +TEST_F(DecodingPairPoolTest, AddPair) { + auto pairPool = DecodingPairPool{}; + auto pair = newPair(); + pairPool.addPair(std::move(pair)); + EXPECT_EQ(pairPool.size(), 1); +} + +TEST_F(DecodingPairPoolTest, AddPairElements) { + auto pairPool = DecodingPairPool{}; + auto decodedVector = std::make_unique(); + auto selectivityVector = std::make_unique(); + pairPool.addPair(std::move(decodedVector), std::move(selectivityVector)); + EXPECT_EQ(pairPool.size(), 1); +} + +TEST_F(DecodingPairPoolTest, FillDecodingPairPool) { + auto pairPool = DecodingPairPool{std::chrono::milliseconds(1000 * 2), 10}; + + for (auto i = 0; i < 10; i++) { + EXPECT_NO_THROW(auto pair = newPair(); pairPool.addPair(std::move(pair));); + } + EXPECT_EQ(pairPool.size(), 10); +} + +TEST_F(DecodingPairPoolTest, OverfillDecodingPairPool) { + auto pairPool = DecodingPairPool{std::chrono::milliseconds(1000 * 10), 2}; + + auto throwLambda = [&]() { + for (int i = 0; i < 5; i++) { + auto pair = newPair(); + pairPool.addPair(std::move(pair)); + } + }; + EXPECT_THROW(throwLambda(), NimbleInternalError); + + try { + throwLambda(); + } catch (const NimbleInternalError& e) { + EXPECT_EQ( + e.errorMessage(), "Timeout enqueuing decodingPair timeout=10000 ms"); + } +} + +TEST_F(DecodingPairPoolTest, FillEmptyFillPool) { + size_t iterations = 10; + std::vector pairs; + + auto pairPool = + DecodingPairPool{std::chrono::milliseconds(1000 * 2), iterations}; + + for (auto i = 0; i < iterations; i++) { + auto buffer = pairPool.reservePair(); + pairs.push_back(std::move(buffer)); + } + EXPECT_EQ(pairPool.size(), 0); + EXPECT_EQ(pairs.size(), iterations); + + for (auto& pair : pairs) { + pairPool.addPair(std::move(pair)); + } + EXPECT_EQ(pairPool.size(), iterations); + pairs.clear(); + + for (auto i = 0; i < iterations; i++) { + auto pair = pairPool.reservePair(); + pairs.push_back(std::move(pair)); + } + EXPECT_EQ(pairPool.size(), 0); + EXPECT_EQ(pairs.size(), iterations); +} + +TEST_F(DecodingPairPoolTest, ParallelFillPool) { + folly::CPUThreadPoolExecutor executor{10}; + ExecutorBarrier barrier{executor}; + auto pairPool = DecodingPairPool{std::chrono::milliseconds{1000 * 10}, 100}; + auto fillPool = [&]() { + for (auto i = 0; i < 10; i++) { + EXPECT_NO_THROW( + auto pair = pairPool.reservePair(); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + pairPool.addPair(std::move(pair));); + } + }; + + for (auto i = 0; i < 10; i++) { + barrier.add(fillPool); + } + + barrier.waitAll(); +} + +} // namespace facebook::nimble::test diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 043e2ca..911e3bb 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -1733,7 +1733,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { auto iterations = 20; auto batches = 20; std::mt19937 rng{seed}; - for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; if (parallelismFactor > 0) { @@ -1822,7 +1823,8 @@ TEST_F(VeloxReaderTests, FuzzComplex) { auto batches = 20; std::mt19937 rng{seed}; - for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; writerOptions.encodingExecutor = parallelismFactor > 0 ? std::make_shared(parallelismFactor)