Skip to content

Commit

Permalink
even more valgrind stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Jul 11, 2024
1 parent 527c49a commit 0df7855
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 31 deletions.
45 changes: 27 additions & 18 deletions src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "duckdb/common/radix_partitioning.hpp"

#include "duckdb/common/types/column/partitioned_column_data.hpp"
#include "duckdb/common/types/row/row_data_collection.hpp"
#include "duckdb/common/types/vector.hpp"
#include "duckdb/common/vector_operations/binary_executor.hpp"
#include "duckdb/common/vector_operations/unary_executor.hpp"
Expand All @@ -13,20 +12,20 @@ template <idx_t radix_bits>
struct RadixPartitioningConstants {
public:
//! Bitmask of the upper bits starting at the 5th byte
static constexpr const idx_t NUM_PARTITIONS = RadixPartitioning::NumberOfPartitions(radix_bits);
static constexpr const idx_t SHIFT = RadixPartitioning::Shift(radix_bits);
static constexpr const hash_t MASK = RadixPartitioning::Mask(radix_bits);
static constexpr idx_t NUM_PARTITIONS = RadixPartitioning::NumberOfPartitions(radix_bits);
static constexpr idx_t SHIFT = RadixPartitioning::Shift(radix_bits);
static constexpr hash_t MASK = RadixPartitioning::Mask(radix_bits);

public:
//! Apply bitmask and right shift to get a number between 0 and NUM_PARTITIONS
static inline hash_t ApplyMask(hash_t hash) {
static hash_t ApplyMask(const hash_t hash) {
D_ASSERT((hash & MASK) >> SHIFT < NUM_PARTITIONS);
return (hash & MASK) >> SHIFT;
}
};

template <class OP, class RETURN_TYPE, typename... ARGS>
RETURN_TYPE RadixBitsSwitch(idx_t radix_bits, ARGS &&...args) {
RETURN_TYPE RadixBitsSwitch(const idx_t radix_bits, ARGS &&...args) {
D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS);
switch (radix_bits) {
case 0:
Expand Down Expand Up @@ -71,25 +70,33 @@ struct RadixLessThan {

struct SelectFunctor {
template <idx_t radix_bits>
static idx_t Operation(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t cutoff,
static idx_t Operation(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
Vector cutoff_vector(Value::HASH(cutoff));
return BinaryExecutor::Select<hash_t, hash_t, RadixLessThan<radix_bits>>(hashes, cutoff_vector, sel, count,
true_sel, false_sel);
}
};

idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t radix_bits, idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, const idx_t count, const idx_t radix_bits,
const idx_t cutoff, SelectionVector *true_sel, SelectionVector *false_sel) {
return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, hashes, sel, count, cutoff, true_sel, false_sel);
}

struct ComputePartitionIndicesFunctor {
template <idx_t radix_bits>
static void Operation(Vector &hashes, Vector &partition_indices, idx_t count) {
static void Operation(Vector &hashes, Vector &partition_indices, const SelectionVector &append_sel,
const idx_t append_count) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, count,
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
D_ASSERT(hashes.GetVectorType() == VectorType::FLAT_VECTOR);
if (append_sel.IsSet()) {
auto hashes_sliced = Vector(hashes, append_sel, append_count);
UnaryExecutor::Execute<hash_t, hash_t>(hashes_sliced, partition_indices, append_count,
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
} else {
UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, append_count,
[&](hash_t hash) { return CONSTANTS::ApplyMask(hash); });
}
}
};

Expand Down Expand Up @@ -138,14 +145,14 @@ void RadixPartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAp
D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
D_ASSERT(state.partition_buffers.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, input.data[hash_col_idx], state.partition_indices,
input.size());
*FlatVector::IncrementalSelectionVector(), input.size());
}

//===--------------------------------------------------------------------===//
// Tuple Data Partitioning
//===--------------------------------------------------------------------===//
RadixPartitionedTupleData::RadixPartitionedTupleData(BufferManager &buffer_manager, const TupleDataLayout &layout_p,
idx_t radix_bits_p, idx_t hash_col_idx_p)
const idx_t radix_bits_p, const idx_t hash_col_idx_p)
: PartitionedTupleData(PartitionedTupleDataType::RADIX, buffer_manager, layout_p.Copy()), radix_bits(radix_bits_p),
hash_col_idx(hash_col_idx_p) {
D_ASSERT(radix_bits <= RadixPartitioning::MAX_RADIX_BITS);
Expand Down Expand Up @@ -174,7 +181,7 @@ void RadixPartitionedTupleData::Initialize() {
}

void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDataAppendState &state,
TupleDataPinProperties properties) const {
const TupleDataPinProperties properties) const {
// Init pin state per partition
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
state.partition_pin_states.reserve(num_partitions);
Expand All @@ -196,18 +203,20 @@ void RadixPartitionedTupleData::InitializeAppendStateInternal(PartitionedTupleDa
state.fixed_partition_entries.resize(RadixPartitioning::NumberOfPartitions(radix_bits));
}

void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) {
void RadixPartitionedTupleData::ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input,
const SelectionVector &append_sel, const idx_t append_count) {
D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, input.data[hash_col_idx], state.partition_indices,
input.size());
append_sel, append_count);
}

void RadixPartitionedTupleData::ComputePartitionIndices(Vector &row_locations, idx_t count,
Vector &partition_indices) const {
Vector intermediate(LogicalType::HASH);
partitions[0]->Gather(row_locations, *FlatVector::IncrementalSelectionVector(), count, hash_col_idx, intermediate,
*FlatVector::IncrementalSelectionVector(), nullptr);
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, intermediate, partition_indices, count);
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, intermediate, partition_indices,
*FlatVector::IncrementalSelectionVector(), count);
}

void RadixPartitionedTupleData::RepartitionFinalizeStates(PartitionedTupleData &old_partitioned_data,
Expand Down
7 changes: 3 additions & 4 deletions src/common/types/row/partitioned_tuple_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state,
const idx_t actual_append_count = append_count == DConstants::INVALID_INDEX ? input.size() : append_count;

// Compute partition indices and store them in state.partition_indices
ComputePartitionIndices(state, input);
ComputePartitionIndices(state, input, append_sel, actual_append_count);

// Build the selection vector for the partitions
BuildPartitionSel(state, append_sel, actual_append_count);
Expand Down Expand Up @@ -173,8 +173,7 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st
switch (state.partition_indices.GetVectorType()) {
case VectorType::FLAT_VECTOR:
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
const auto &partition_index = partition_indices[i];
auto partition_entry = partition_entries.find(partition_index);
if (partition_entry == partition_entries.end()) {
partition_entries[partition_index] = list_entry_t(0, 1);
Expand Down Expand Up @@ -213,7 +212,7 @@ void PartitionedTupleData::BuildPartitionSel(PartitionedTupleDataAppendState &st
auto &reverse_partition_sel = state.reverse_partition_sel;
for (idx_t i = 0; i < append_count; i++) {
const auto index = append_sel.get_index(i);
const auto &partition_index = partition_indices[index];
const auto &partition_index = partition_indices[i];
auto &partition_offset = partition_entries[partition_index].offset;
reverse_partition_sel[index] = UnsafeNumericCast<sel_t>(partition_offset);
partition_sel[partition_offset++] = UnsafeNumericCast<sel_t>(index);
Expand Down
13 changes: 6 additions & 7 deletions src/common/vector_operations/vector_hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,6 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV
hashes.Flatten(count);
auto hdata = FlatVector::GetData<hash_t>(hashes);

if (FIRST_HASH) {
for (idx_t i = 0; i < count; i++) {
const auto ridx = HAS_RSEL ? rsel->get_index(i) : i;
hdata[ridx] = 0;
}
}

UnifiedVectorFormat idata;
input.ToUnifiedFormat(count, idata);

Expand All @@ -213,6 +206,9 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV
for (idx_t i = 0; i < count; i++) {
auto lidx = idata.sel->get_index(i);
if (idata.validity.RowIsValid(lidx)) {
if (FIRST_HASH) {
hdata[i] = 0;
}
for (idx_t j = 0; j < array_size; j++) {
auto offset = lidx * array_size + j;
hdata[i] = CombineHashScalar(hdata[i], chdata[offset]);
Expand Down Expand Up @@ -240,6 +236,9 @@ static inline void ArrayLoopHash(Vector &input, Vector &hashes, const SelectionV
VectorOperations::Hash(dict_vec, array_hashes, array_size);
auto ahdata = FlatVector::GetData<hash_t>(array_hashes);

if (FIRST_HASH) {
hdata[ridx] = 0;
}
// Combine the hashes of the array
for (idx_t j = 0; j < array_size; j++) {
hdata[ridx] = CombineHashScalar(hdata[ridx], ahdata[j]);
Expand Down
3 changes: 2 additions & 1 deletion src/include/duckdb/common/radix_partitioning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class RadixPartitionedTupleData : public PartitionedTupleData {
//===--------------------------------------------------------------------===//
void InitializeAppendStateInternal(PartitionedTupleDataAppendState &state,
TupleDataPinProperties properties) const override;
void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) override;
void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input,
const SelectionVector &append_sel, const idx_t append_count) override;
void ComputePartitionIndices(Vector &row_locations, idx_t count, Vector &partition_indices) const override;
idx_t MaxPartitionIndex() const override {
return RadixPartitioning::NumberOfPartitions(radix_bits) - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class PartitionedTupleData {
//! Compute the partition indices for this type of partitioning for the input DataChunk and store them in the
//! `partition_data` of the local state. If this type creates partitions on the fly (for, e.g., hive), this
//! function is also in charge of creating new partitions and mapping the input data to a partition index
virtual void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) {
virtual void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input,
const SelectionVector &append_sel, const idx_t append_count) {
throw NotImplementedException("ComputePartitionIndices for this type of PartitionedTupleData");
}
//! Compute partition indices from rows (similar to function above)
Expand Down

0 comments on commit 0df7855

Please sign in to comment.