From 0df785530968036a8cea9409964b11fce8778f36 Mon Sep 17 00:00:00 2001 From: Laurens Kuiper Date: Thu, 11 Jul 2024 13:38:32 +0200 Subject: [PATCH] even more valgrind stuff --- src/common/radix_partitioning.cpp | 45 +++++++++++-------- .../types/row/partitioned_tuple_data.cpp | 7 ++- src/common/vector_operations/vector_hash.cpp | 13 +++--- .../duckdb/common/radix_partitioning.hpp | 3 +- .../types/row/partitioned_tuple_data.hpp | 3 +- 5 files changed, 40 insertions(+), 31 deletions(-) diff --git a/src/common/radix_partitioning.cpp b/src/common/radix_partitioning.cpp index 72fe78fb902..729ac779ace 100644 --- a/src/common/radix_partitioning.cpp +++ b/src/common/radix_partitioning.cpp @@ -1,7 +1,6 @@ #include "duckdb/common/radix_partitioning.hpp" #include "duckdb/common/types/column/partitioned_column_data.hpp" -#include "duckdb/common/types/row/row_data_collection.hpp" #include "duckdb/common/types/vector.hpp" #include "duckdb/common/vector_operations/binary_executor.hpp" #include "duckdb/common/vector_operations/unary_executor.hpp" @@ -13,20 +12,20 @@ template struct RadixPartitioningConstants { public: //! Bitmask of the upper bits starting at the 5th byte - static constexpr const idx_t NUM_PARTITIONS = RadixPartitioning::NumberOfPartitions(radix_bits); - static constexpr const idx_t SHIFT = RadixPartitioning::Shift(radix_bits); - static constexpr const hash_t MASK = RadixPartitioning::Mask(radix_bits); + static constexpr idx_t NUM_PARTITIONS = RadixPartitioning::NumberOfPartitions(radix_bits); + static constexpr idx_t SHIFT = RadixPartitioning::Shift(radix_bits); + static constexpr hash_t MASK = RadixPartitioning::Mask(radix_bits); public: //! Apply bitmask and right shift to get a number between 0 and NUM_PARTITIONS - static inline hash_t ApplyMask(hash_t hash) { + static hash_t ApplyMask(const hash_t hash) { D_ASSERT((hash & MASK) >> SHIFT < NUM_PARTITIONS); return (hash & MASK) >> SHIFT; } }; template -RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&...args) { +RETURN_TYPE RadixBitsSwitch(const idx_t radix_bits, ARGS &&...args) { D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS); switch (radix_bits) { case 0: @@ -71,7 +70,7 @@ struct RadixLessThan { struct SelectFunctor { template - static idx_t Operation(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t cutoff, + static idx_t Operation(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t cutoff, SelectionVector *true_sel, SelectionVector *false_sel) { Vector cutoff_vector(Value::HASH(cutoff)); return BinaryExecutor::Select>(hashes, cutoff_vector, sel, count, @@ -79,17 +78,25 @@ struct SelectFunctor { } }; -idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t radix_bits, idx_t cutoff, - SelectionVector *true_sel, SelectionVector *false_sel) { +idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t radix_bits, + const idx_t cutoff, SelectionVector *true_sel, SelectionVector *false_sel) { return RadixBitsSwitch(radix_bits, hashes, sel, count, cutoff, true_sel, false_sel); } struct ComputePartitionIndicesFunctor { template - static void Operation(Vector &hashes, Vector &partition_indices, idx_t count) { + static void Operation(Vector &hashes, Vector &partition_indices, const SelectionVector &append_sel, + const idx_t append_count) { using CONSTANTS = RadixPartitioningConstants; - UnaryExecutor::Execute(hashes, partition_indices, count, - [&](hash_t hash) { return CONSTANTS::ApplyMask(hash); }); + D_ASSERT(hashes.GetVectorType() == VectorType::FLAT_VECTOR); + if (append_sel.IsSet()) { + auto hashes_sliced = Vector(hashes, append_sel, append_count); + UnaryExecutor::Execute(hashes_sliced, partition_indices, append_count, + [&](hash_t hash) { return CONSTANTS::ApplyMask(hash); }); + } else { + UnaryExecutor::Execute(hashes, partition_indices, append_count, + [&](hash_t hash) { return CONSTANTS::ApplyMask(hash); }); + } } }; @@ -138,14 +145,14 @@ void RadixPartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAp D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits)); D_ASSERT(state.partition_buffers.size() == RadixPartitioning::NumberOfPartitions(radix_bits)); RadixBitsSwitch(radix_bits, input.data[hash_col_idx], state.partition_indices, - input.size()); + *FlatVector::IncrementalSelectionVector(), input.size()); } //===--------------------------------------------------------------------===// // Tuple Data Partitioning //===--------------------------------------------------------------------===// RadixPartitionedTupleData::RadixPartitionedTupleData(BufferManager &buffer_manager, const TupleDataLayout &layout_p, - idx_t radix_bits_p, idx_t hash_col_idx_p) + const idx_t radix_bits_p, const idx_t hash_col_idx_p) : PartitionedTupleData(PartitionedTupleDataType::RADIX, buffer_manager, layout_p.Copy()), radix_bits(radix_bits_p), hash_col_idx(hash_col_idx_p) { D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS); @@ -174,7 +181,7 @@ void RadixPartitionedTupleData::Initialize() { } void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDataAppendState &state, - TupleDataPinProperties properties) const { + const TupleDataPinProperties properties) const { // Init pin state per partition const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits); state.partition_pin_states.reserve(num_partitions); @@ -196,10 +203,11 @@ void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDa state.fixed_partition_entries.resize(RadixPartitioning::NumberOfPartitions(radix_bits)); } -void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) { +void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input, + const SelectionVector &append_sel, const idx_t append_count) { D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits)); RadixBitsSwitch(radix_bits, input.data[hash_col_idx], state.partition_indices, - input.size()); + append_sel, append_count); } void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, idx_t count, @@ -207,7 +215,8 @@ void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, i Vector intermediate(LogicalType::HASH); partitions[0]->Gather(row_locations, *FlatVector::IncrementalSelectionVector(), count, hash_col_idx, intermediate, *FlatVector::IncrementalSelectionVector(), nullptr); - RadixBitsSwitch(radix_bits, intermediate, partition_indices, count); + RadixBitsSwitch(radix_bits, intermediate, partition_indices, + *FlatVector::IncrementalSelectionVector(), count); } void RadixPartitionedTupleData::RepartitionFinalizeStates(PartitionedTupleData &old_partitioned_data, diff --git a/src/common/types/row/partitioned_tuple_data.cpp b/src/common/types/row/partitioned_tuple_data.cpp index a5e84529347..7f465c7a6c0 100644 --- a/src/common/types/row/partitioned_tuple_data.cpp +++ b/src/common/types/row/partitioned_tuple_data.cpp @@ -50,7 +50,7 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state, const idx_t actual_append_count = append_count == DConstants::INVALID_INDEX ? input.size() : append_count; // Compute partition indices and store them in state.partition_indices - ComputePartitionIndices(state, input); + ComputePartitionIndices(state, input, append_sel, actual_append_count); // Build the selection vector for the partitions BuildPartitionSel(state, append_sel, actual_append_count); @@ -173,8 +173,7 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st switch (state.partition_indices.GetVectorType()) { case VectorType::FLAT_VECTOR: for (idx_t i = 0; i < append_count; i++) { - const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; + const auto &partition_index = partition_indices[i]; auto partition_entry = partition_entries.find(partition_index); if (partition_entry == partition_entries.end()) { partition_entries[partition_index] = list_entry_t(0, 1); @@ -213,7 +212,7 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st auto &reverse_partition_sel = state.reverse_partition_sel; for (idx_t i = 0; i < append_count; i++) { const auto index = append_sel.get_index(i); - const auto &partition_index = partition_indices[index]; + const auto &partition_index = partition_indices[i]; auto &partition_offset = partition_entries[partition_index].offset; reverse_partition_sel[index] = UnsafeNumericCast(partition_offset); partition_sel[partition_offset++] = UnsafeNumericCast(index); diff --git a/src/common/vector_operations/vector_hash.cpp b/src/common/vector_operations/vector_hash.cpp index c535db902c6..e6ef5f5fc4b 100644 --- a/src/common/vector_operations/vector_hash.cpp +++ b/src/common/vector_operations/vector_hash.cpp @@ -184,13 +184,6 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV hashes.Flatten(count); auto hdata = FlatVector::GetData(hashes); - if (FIRST_HASH) { - for (idx_t i = 0; i < count; i++) { - const auto ridx = HAS_RSEL ? rsel->get_index(i) : i; - hdata[ridx] = 0; - } - } - UnifiedVectorFormat idata; input.ToUnifiedFormat(count, idata); @@ -213,6 +206,9 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV for (idx_t i = 0; i < count; i++) { auto lidx = idata.sel->get_index(i); if (idata.validity.RowIsValid(lidx)) { + if (FIRST_HASH) { + hdata[i] = 0; + } for (idx_t j = 0; j < array_size; j++) { auto offset = lidx * array_size + j; hdata[i] = CombineHashScalar(hdata[i], chdata[offset]); @@ -240,6 +236,9 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV VectorOperations::Hash(dict_vec, array_hashes, array_size); auto ahdata = FlatVector::GetData(array_hashes); + if (FIRST_HASH) { + hdata[ridx] = 0; + } // Combine the hashes of the array for (idx_t j = 0; j < array_size; j++) { hdata[ridx] = CombineHashScalar(hdata[ridx], ahdata[j]); diff --git a/src/include/duckdb/common/radix_partitioning.hpp b/src/include/duckdb/common/radix_partitioning.hpp index c3bc3949abd..aa5efc2860f 100644 --- a/src/include/duckdb/common/radix_partitioning.hpp +++ b/src/include/duckdb/common/radix_partitioning.hpp @@ -125,7 +125,8 @@ class RadixPartitionedTupleData : public PartitionedTupleData { //===--------------------------------------------------------------------===// void InitializeAppendStateInternal(PartitionedTupleDataAppendState &state, TupleDataPinProperties properties) const override; - void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) override; + void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input, + const SelectionVector &append_sel, const idx_t append_count) override; void ComputePartitionIndices(Vector &row_locations, idx_t count, Vector &partition_indices) const override; idx_t MaxPartitionIndex() const override { return RadixPartitioning::NumberOfPartitions(radix_bits) - 1; diff --git a/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp b/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp index 344bd51c343..767923c6e07 100644 --- a/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp +++ b/src/include/duckdb/common/types/row/partitioned_tuple_data.hpp @@ -109,7 +109,8 @@ class PartitionedTupleData { //! Compute the partition indices for this type of partitioning for the input DataChunk and store them in the //! `partition_data` of the local state. If this type creates partitions on the fly (for, e.g., hive), this //! function is also in charge of creating new partitions and mapping the input data to a partition index - virtual void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) { + virtual void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input, + const SelectionVector &append_sel, const idx_t append_count) { throw NotImplementedException("ComputePartitionIndices for this type of PartitionedTupleData"); } //! Compute partition indices from rows (similar to function above)