diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index a1c3028..d0b42bb 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -27,31 +27,6 @@ namespace facebook::nimble { -class FieldWriterContext::LocalDecodedVector { - public: - explicit LocalDecodedVector(FieldWriterContext& context) - : context_(context), vector_(context_.getDecodedVector()) {} - - LocalDecodedVector(LocalDecodedVector&& other) noexcept - : context_{other.context_}, vector_{std::move(other.vector_)} {} - - LocalDecodedVector& operator=(LocalDecodedVector&& other) = delete; - - ~LocalDecodedVector() { - if (vector_) { - context_.releaseDecodedVector(std::move(vector_)); - } - } - - velox::DecodedVector& get() { - return *vector_; - } - - private: - FieldWriterContext& context_; - std::unique_ptr vector_; -}; - namespace { template @@ -359,8 +334,8 @@ class SimpleFieldWriter : public FieldWriter { }); } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); iterateNonNullValues( ranges, @@ -370,6 +345,7 @@ class SimpleFieldWriter : public FieldWriter { data.push_back( C::convert(value, buffer, valuesStream_.extraMemory())); }); + context_.decodingPairPool().addPair(std::move(pair)); } } @@ -421,8 +397,8 @@ class RowFieldWriter : public FieldWriter { childRangesPtr = &ranges; } } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; row = decoded.base()->as(); NIMBLE_ASSERT(row, "Unexpected vector type"); NIMBLE_CHECK(fields_.size() == row->childrenSize(), "schema mismatch"); @@ -433,6 +409,7 @@ class RowFieldWriter : public FieldWriter { nullsStream_.mutableNonNulls(), Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); + context_.decodingPairPool().addPair(std::move(pair)); } for (auto i = 0; i < fields_.size(); ++i) { fields_[i]->write(row->childAt(i), *childRangesPtr); @@ -500,8 +477,8 @@ class MultiValueFieldWriter : public FieldWriter { iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; casted = decoded.base()->as(); NIMBLE_ASSERT(casted, "Unexpected vector type"); offsets = casted->rawOffsets(); @@ -510,6 +487,7 @@ class MultiValueFieldWriter : public FieldWriter { lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc); + context_.decodingPairPool().addPair(std::move(pair)); } return casted; @@ -715,8 +693,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter { iterableVector, processMapIndex); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; mapVector = decoded.base()->template as(); NIMBLE_ASSERT(mapVector, "Unexpected vector type"); rawOffsets = mapVector->rawOffsets(); @@ -728,6 +706,7 @@ class SlidingWindowMapFieldWriter : public FieldWriter { offsetsStream_.mutableNonNulls(), iterableVector, processMapIndex); + context_.decodingPairPool().addPair(std::move(pair)); } // Copy the last valid element into the cache. @@ -965,13 +944,14 @@ class FlatMapFieldWriter : public FieldWriter { // Keys are encoded. Decode. iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, computeKeyRanges); - auto localDecodedKeys = decode(mapKeys, keyRanges); - auto& decodedKeys = localDecodedKeys.get(); + auto pair = decode(mapKeys, keyRanges); + auto& decodedKeys = *pair.first; Decoded keysVector{decodedKeys}; iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), vector, [&](auto offset) { processMap(offset, keysVector); }); + context_.decodingPairPool().addPair(std::move(pair)); } }; @@ -990,8 +970,8 @@ class FlatMapFieldWriter : public FieldWriter { processVector(map, Flat{vector}); } else { // Map is encoded. Decode. - auto localDecodedMap = decode(vector, ranges); - auto& decodedMap = localDecodedMap.get(); + auto pair = decode(vector, ranges); + auto& decodedMap = *pair.first; map = decodedMap.base()->template as(); NIMBLE_ASSERT(map, "Unexpected vector type"); offsets = map->rawOffsets(); @@ -999,6 +979,7 @@ class FlatMapFieldWriter : public FieldWriter { nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size); processVector(map, Decoded{decodedMap}); + context_.decodingPairPool().addPair(std::move(pair)); } // Now actually ingest the map values @@ -1375,8 +1356,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { iterateNonNullIndices(ranges, nonNulls, iterableVector, dedupProc); } else { - auto localDecoded = decode(vectorElements, childRanges); - auto& decoded = localDecoded.get(); + auto pair = decode(vectorElements, childRanges); + auto& decoded = *pair.first; /** compare array at index and prevIndex to be equal */ compareConsecutive = [&](velox::vector_size_t index, velox::vector_size_t prevIndex) { @@ -1407,6 +1388,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { return match; }; iterateNonNullIndices(ranges, nonNulls, iterableVector, dedupProc); + context_.decodingPairPool().addPair(std::move(pair)); } // Copy the last valid element into the cache. @@ -1455,8 +1437,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { ingestLengthsOffsetsByElements( arrayVector, iterableVector, ranges, childRanges, filteredRanges); } else { - auto localDecoded = decode(vector, ranges); - auto& decoded = localDecoded.get(); + auto pair = decode(vector, ranges); + auto& decoded = *pair.first; arrayVector = decoded.base()->template as(); NIMBLE_ASSERT(arrayVector, "Unexpected vector type"); rawOffsets = arrayVector->rawOffsets(); @@ -1468,6 +1450,7 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc); ingestLengthsOffsetsByElements( arrayVector, iterableVector, ranges, childRanges, filteredRanges); + context_.decodingPairPool().addPair(std::move(pair)); } return arrayVector; } @@ -1510,51 +1493,23 @@ std::unique_ptr createArrayWithOffsetsFieldWriter( } // namespace -FieldWriterContext::LocalDecodedVector -FieldWriterContext::getLocalDecodedVector() { - NIMBLE_DASSERT(vectorDecoderVisitor, "vectorDecoderVisitor is missing"); - vectorDecoderVisitor(); - return LocalDecodedVector{*this}; -} - -velox::SelectivityVector& FieldWriterContext::getSelectivityVector( - velox::vector_size_t size) { - if (LIKELY(selectivity_.get() != nullptr)) { - selectivity_->resize(size); - } else { - selectivity_ = std::make_unique(size); - } - return *selectivity_; -} - -std::unique_ptr FieldWriterContext::getDecodedVector() { - if (decodedVectorPool_.empty()) { - return std::make_unique(); - } - auto vector = std::move(decodedVectorPool_.back()); - decodedVectorPool_.pop_back(); - return vector; -} - -void FieldWriterContext::releaseDecodedVector( - std::unique_ptr&& vector) { - decodedVectorPool_.push_back(std::move(vector)); -} - -FieldWriterContext::LocalDecodedVector FieldWriter::decode( +DecodingPair FieldWriter::decode( const velox::VectorPtr& vector, const OrderedRanges& ranges) { - auto& selectivityVector = context_.getSelectivityVector(vector->size()); - // initialize selectivity vector - selectivityVector.clearAll(); + auto pair = context_.decodingPairPool().reservePair(); + // decodedVector = pair.first + // selectivityVector = pair.second + + pair.second->resize(vector->size()); + pair.second->clearAll(); ranges.apply([&](auto offset, auto size) { - selectivityVector.setValidRange(offset, offset + size, true); + pair.second->setValidRange(offset, offset + size, true); }); - selectivityVector.updateBounds(); + pair.second->updateBounds(); - auto localDecoded = context_.getLocalDecodedVector(); - localDecoded.get().decode(*vector, selectivityVector); - return localDecoded; + pair.first->decode(*vector, *pair.second); + // we return the pair so that the caller can return it to the pool + return pair; } std::unique_ptr FieldWriter::create( diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index a5c1405..aad6b45 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -22,6 +22,7 @@ #include "dwio/nimble/velox/OrderedRanges.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" +#include "folly/concurrency/DynamicBoundedQueue.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -34,20 +35,104 @@ struct InputBufferGrowthStats { std::atomic itemCount{0}; }; -struct FieldWriterContext { - class LocalDecodedVector; +using DecodedVectorPtr = std::unique_ptr; +using SelectivityVectorPtr = std::unique_ptr; +using DecodingPair = std::pair; + +// A pool of decoding pairs. Decoding pairs are used to decode a vector +// and its associated selectivity vector. The pool is used to avoid +// repeated allocations of decoding pairs. DMPMCQueue needs to be set to +// 90% of expected capacity for best performance. +class DecodingPairPool { + public: + explicit DecodingPairPool( + std::function vectorDecoderVisitor = []() {}, + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), + size_t maxPoolSize = std::thread::hardware_concurrency()) + : timeout_{timeout}, + pool_{maxPoolSize}, + vectorDecoderVisitor_{std::move(vectorDecoderVisitor)} { + NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0"); + NIMBLE_CHECK(maxPoolSize > 0, "maxPoolSize must be > 0"); + NIMBLE_CHECK(vectorDecoderVisitor_, "vectorDecoderVisitor must be set"); + } + ~DecodingPairPool() { + clearPool(); + } + + void addPair(DecodingPair pair) { + auto status = pool_.try_enqueue_for(std::move(pair), timeout_); + if (!status) { + NIMBLE_UNKNOWN( + "Timeout enqueuing decodingPair timeout=" + + std::to_string(timeout_.count()) + " ms"); + } + } + + void addPair(DecodedVectorPtr decoded, SelectivityVectorPtr selectivity) { + addPair(std::make_pair(std::move(decoded), std::move(selectivity))); + } + // Get a decoding pair from the pool, or create a new one if the pool is + // empty. + DecodingPair reservePair() { + vectorDecoderVisitor_(); + + if (pool_.empty()) { + return newPair(); + } + DecodingPair pair; + auto status = pool_.try_dequeue_for(pair, timeout_); + if (!status) { + NIMBLE_UNREACHABLE( + "Timeout dequeuing decodingPair timeout=" + + std::to_string(timeout_.count()) + " ms"); + } + return pair; + } + + size_t size() const { + return pool_.size(); + } + + void clearPool() { + while (!pool_.empty()) { + DecodingPair pair; + pool_.dequeue(pair); + pair.first.reset(); + pair.second.reset(); + } + } + + private: + const std::chrono::milliseconds timeout_; + folly::DMPMCQueue pool_; + std::function vectorDecoderVisitor_; + + DecodingPair newPair() { + auto decoded = std::make_unique(); + auto selectivity = std::make_unique(); + return std::make_pair(std::move(decoded), std::move(selectivity)); + } +}; + +struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, std::unique_ptr reclaimer = nullptr, - std::function vectorDecoderVisitor = []() {}) + std::function vectorDecoderVisitor = []() {}, + std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), + size_t maxPoolSize = std::thread::hardware_concurrency()) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, - vectorDecoderVisitor(std::move(vectorDecoderVisitor)) { + decodingPairPool_{std::make_unique( + std::move(vectorDecoderVisitor), + timeout, + maxPoolSize)} { resetStringBuffer(); } @@ -67,10 +152,9 @@ struct FieldWriterContext { std::function typeAddedHandler = [](const TypeBuilder&) {}; - std::function vectorDecoderVisitor; - - LocalDecodedVector getLocalDecodedVector(); - velox::SelectivityVector& getSelectivityVector(velox::vector_size_t size); + DecodingPairPool& decodingPairPool() { + return *decodingPairPool_; + } Buffer& stringBuffer() { return *buffer_; @@ -108,12 +192,8 @@ struct FieldWriterContext { } private: - std::unique_ptr getDecodedVector(); - void releaseDecodedVector(std::unique_ptr&& vector); - std::unique_ptr buffer_; - std::vector> decodedVectorPool_; - std::unique_ptr selectivity_; + std::unique_ptr decodingPairPool_; std::vector> streams_; }; @@ -157,7 +237,7 @@ class FieldWriter { FieldWriterContext& context_; std::shared_ptr typeBuilder_; - FieldWriterContext::LocalDecodedVector decode( + DecodingPair decode( const velox::VectorPtr& vector, const OrderedRanges& ranges); }; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0c5ea32..50fac4f 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,7 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, + : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor, options.poolTimeout, options.maxPoolSize}, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 8e4386a..68021e7 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -126,6 +126,9 @@ struct VeloxWriterOptions { const velox::common::SpillConfig* spillConfig{nullptr}; + std::chrono::milliseconds poolTimeout = std::chrono::milliseconds{1000 * 10}; + size_t maxPoolSize = std::thread::hardware_concurrency(); + // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; diff --git a/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp b/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp new file mode 100644 index 0000000..94f21a4 --- /dev/null +++ b/dwio/nimble/velox/tests/DecodingPairPoolTests.cpp @@ -0,0 +1,190 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/velox/FieldWriter.h" +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "velox/dwio/common/ExecutorBarrier.h" +#include "velox/vector/DecodedVector.h" + +namespace facebook::nimble::test { +using MemoryPool = velox::memory::MemoryPool; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; +using DecodedVector = velox::DecodedVector; +using SelectivityVector = velox::SelectivityVector; + +class DecodingPairPoolTest : public ::testing::Test { + public: + DecodingPair newPair() { + auto decoded = std::make_unique(); + auto selectivity = std::make_unique(); + return std::make_pair(std::move(decoded), std::move(selectivity)); + } + + protected: + static void SetUpTestCase() {} + + void SetUp() override { + decodingVistor_ = []() {}; + } + + std::function decodingVistor_; +}; + +TEST_F(DecodingPairPoolTest, CreatePool) { + auto pairPool = DecodingPairPool{}; + EXPECT_NO_THROW(pairPool); +} + +TEST_F(DecodingPairPoolTest, CreatePoolBadTimeout) { + auto throwLambda = [&]() { + DecodingPairPool{decodingVistor_, std::chrono::milliseconds(0), 10}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "timeout must be > 0"); + } +} + +TEST_F(DecodingPairPoolTest, CreatePoolBadMaxPool) { + auto throwLambda = [&]() { + DecodingPairPool{decodingVistor_, std::chrono::milliseconds(1000), 0}; + }; + EXPECT_THROW(throwLambda(), NimbleUserError); + try { + throwLambda(); + } catch (const NimbleUserError& e) { + EXPECT_EQ(e.errorMessage(), "maxPoolSize must be > 0"); + } +} + +TEST_F(DecodingPairPoolTest, DestroyPool) { + auto pairPoolPtr = std::make_unique(); + pairPoolPtr->reservePair(); + pairPoolPtr.reset(); + EXPECT_EQ(pairPoolPtr, nullptr); +} + +TEST_F(DecodingPairPoolTest, EmptyDecodingPairPool) { + auto pairPool = DecodingPairPool{}; + EXPECT_EQ(pairPool.size(), 0); +} + +TEST_F(DecodingPairPoolTest, ReservePair) { + auto pairPool = DecodingPairPool{}; + auto pair = pairPool.reservePair(); + EXPECT_NO_THROW(pair); +} + +TEST_F(DecodingPairPoolTest, AddPair) { + auto pairPool = DecodingPairPool{}; + auto pair = newPair(); + pairPool.addPair(std::move(pair)); + EXPECT_EQ(pairPool.size(), 1); +} + +TEST_F(DecodingPairPoolTest, AddPairElements) { + auto pairPool = DecodingPairPool{}; + auto decodedVector = std::make_unique(); + auto selectivityVector = std::make_unique(); + pairPool.addPair(std::move(decodedVector), std::move(selectivityVector)); + EXPECT_EQ(pairPool.size(), 1); +} + +TEST_F(DecodingPairPoolTest, FillDecodingPairPool) { + auto pairPool = DecodingPairPool{ + decodingVistor_, std::chrono::milliseconds(1000 * 2), 10}; + + for (auto i = 0; i < 10; i++) { + EXPECT_NO_THROW(auto pair = newPair(); pairPool.addPair(std::move(pair));); + } + EXPECT_EQ(pairPool.size(), 10); +} + +TEST_F(DecodingPairPoolTest, OverfillDecodingPairPool) { + auto pairPool = DecodingPairPool{ + decodingVistor_, std::chrono::milliseconds(1000 * 10), 2}; + + auto throwLambda = [&]() { + for (int i = 0; i < 5; i++) { + auto pair = newPair(); + pairPool.addPair(std::move(pair)); + } + }; + EXPECT_THROW(throwLambda(), NimbleInternalError); + + try { + throwLambda(); + } catch (const NimbleInternalError& e) { + EXPECT_EQ( + e.errorMessage(), "Timeout enqueuing decodingPair timeout=10000 ms"); + } +} + +TEST_F(DecodingPairPoolTest, FillEmptyFillPool) { + size_t iterations = 10; + std::vector pairs; + + auto pairPool = DecodingPairPool{ + decodingVistor_, std::chrono::milliseconds(1000 * 2), iterations}; + + for (auto i = 0; i < iterations; i++) { + auto buffer = pairPool.reservePair(); + pairs.push_back(std::move(buffer)); + } + EXPECT_EQ(pairPool.size(), 0); + EXPECT_EQ(pairs.size(), iterations); + + for (auto& pair : pairs) { + pairPool.addPair(std::move(pair)); + } + EXPECT_EQ(pairPool.size(), iterations); + pairs.clear(); + + for (auto i = 0; i < iterations; i++) { + auto pair = pairPool.reservePair(); + pairs.push_back(std::move(pair)); + } + EXPECT_EQ(pairPool.size(), 0); + EXPECT_EQ(pairs.size(), iterations); +} + +TEST_F(DecodingPairPoolTest, ParallelFillPool) { + folly::CPUThreadPoolExecutor executor{10}; + ExecutorBarrier barrier{executor}; + auto pairPool = DecodingPairPool{ + decodingVistor_, std::chrono::milliseconds{1000 * 10}, 100}; + auto fillPool = [&]() { + for (auto i = 0; i < 10; i++) { + EXPECT_NO_THROW( + auto pair = pairPool.reservePair(); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + pairPool.addPair(std::move(pair));); + } + }; + + for (auto i = 0; i < 10; i++) { + barrier.add(fillPool); + } + + barrier.waitAll(); +} + +} // namespace facebook::nimble::test diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 72fee6a..b192107 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2426,7 +2426,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { auto iterations = 20; auto batches = 20; std::mt19937 rng{seed}; - for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; if (parallelismFactor > 0) { @@ -2515,7 +2516,8 @@ TEST_F(VeloxReaderTests, FuzzComplex) { auto batches = 20; std::mt19937 rng{seed}; - for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + for (auto parallelismFactor : + {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; writerOptions.encodingExecutor = parallelismFactor > 0 ? std::make_shared(parallelismFactor)