Skip to content

Commit

Permalink
remove reverse partition order and clean up code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Oct 10, 2024
1 parent f7d0100 commit db905a4
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 43 deletions.
3 changes: 2 additions & 1 deletion src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ RadixPartitionedTupleData::~RadixPartitionedTupleData() {
}

void RadixPartitionedTupleData::Initialize() {
for (idx_t i = 0; i < RadixPartitioning::NumberOfPartitions(radix_bits); i++) {
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
for (idx_t i = 0; i < num_partitions; i++) {
partitions.emplace_back(CreatePartitionCollection(i));
partitions.back()->SetPartitionIndex(i);
}
Expand Down
15 changes: 4 additions & 11 deletions src/common/types/row/partitioned_tuple_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,8 @@ void PartitionedTupleData::Repartition(PartitionedTupleData &new_partitioned_dat
PartitionedTupleDataAppendState append_state;
new_partitioned_data.InitializeAppendState(append_state);

const auto reverse = RepartitionReverseOrder();
const idx_t start_idx = reverse ? partitions.size() : 0;
const idx_t end_idx = reverse ? 0 : partitions.size();
const int64_t update = reverse ? -1 : 1;
const int64_t adjustment = reverse ? -1 : 0;

for (idx_t partition_idx = start_idx; partition_idx != end_idx; partition_idx += idx_t(update)) {
auto actual_partition_idx = partition_idx + idx_t(adjustment);
auto &partition = *partitions[actual_partition_idx];
for (idx_t partition_idx = 0; partition_idx < partitions.size(); partition_idx++) {
auto &partition = *partitions[partition_idx];

if (partition.Count() > 0) {
TupleDataChunkIterator iterator(partition, TupleDataPinProperties::DESTROY_AFTER_DONE, true);
Expand All @@ -279,9 +272,9 @@ void PartitionedTupleData::Repartition(PartitionedTupleData &new_partitioned_dat
new_partitioned_data.Append(append_state, chunk_state, iterator.GetCurrentChunkCount());
} while (iterator.Next());

RepartitionFinalizeStates(*this, new_partitioned_data, append_state, actual_partition_idx);
RepartitionFinalizeStates(*this, new_partitioned_data, append_state, partition_idx);
}
partitions[actual_partition_idx]->Reset();
partitions[partition_idx]->Reset();
}
new_partitioned_data.FlushAppendState(append_state);

Expand Down
1 change: 0 additions & 1 deletion src/execution/radix_partitioned_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ void RadixHTConfig::SetRadixBitsInternal(const idx_t radix_bits_p, bool external
sink.external = true;
}
sink_radix_bits = radix_bits_p;
return;
}

idx_t RadixHTConfig::InitialSinkRadixBits(ClientContext &context) {
Expand Down
3 changes: 0 additions & 3 deletions src/include/duckdb/common/radix_partitioning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ class RadixPartitionedTupleData : public PartitionedTupleData {
return RadixPartitioning::NumberOfPartitions(radix_bits) - 1;
}

bool RepartitionReverseOrder() const override {
return true;
}
void RepartitionFinalizeStates(PartitionedTupleData &old_partitioned_data,
PartitionedTupleData &new_partitioned_data, PartitionedTupleDataAppendState &state,
idx_t finished_partition_idx) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ class PartitionedTupleData {
return DConstants::INVALID_INDEX;
}

//! Whether or not to iterate over the original partitions in reverse order when repartitioning (optional)
virtual bool RepartitionReverseOrder() const {
return false;
}
//! Finalize states while repartitioning - useful for unpinning blocks that are no longer needed (optional)
virtual void RepartitionFinalizeStates(PartitionedTupleData &old_partitioned_data,
PartitionedTupleData &new_partitioned_data,
Expand Down
8 changes: 4 additions & 4 deletions src/include/duckdb/storage/buffer/block_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
}

void SetEvictionQueueIndex(const idx_t index) {
D_ASSERT(!managed_buffer_eviction_queue_index.IsValid()); // Cannot overwrite
D_ASSERT(!eviction_queue_idx.IsValid()); // Cannot overwrite
D_ASSERT(buffer->type == FileBufferType::MANAGED_BUFFER); // MANAGED_BUFFER only (at least, for now)
managed_buffer_eviction_queue_index = index;
eviction_queue_idx = index;
}

private:
Expand Down Expand Up @@ -160,8 +160,8 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
BufferPoolReservation memory_charge;
//! Does the block contain any memory pointers?
const char *unswizzled;
//! Index for eviction queue (FileBufferType::MANAGED_BUFFER only)
optional_idx managed_buffer_eviction_queue_index;
//! Index for eviction queue (FileBufferType::MANAGED_BUFFER only, for now)
optional_idx eviction_queue_idx;
};

} // namespace duckdb
11 changes: 7 additions & 4 deletions src/include/duckdb/storage/buffer/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ class BufferPool {
//! Increments the dead nodes for the queue with specified type
void IncrementDeadNodes(const BlockHandle &handle);

//! How many eviction queues we have for FileBufferType::MANAGED_BUFFER
static constexpr idx_t MANAGED_BUFFER_EVICTION_QUEUES = 6;
//! Total eviction queue count (MANAGED_BUFFER + BLOCK + TINY_BUFFER)
static constexpr idx_t EVICTION_QUEUES = MANAGED_BUFFER_EVICTION_QUEUES + 2;
//! How many eviction queues we have for the different FileBufferTypes
static constexpr idx_t BLOCK_EVICTION_QUEUES = 1;
static constexpr idx_t MANAGED_BUFFER_EVICTION_QUEUES = 1;
static constexpr idx_t TINY_BUFFER_EVICTION_QUEUES = 1;
//! Total eviction queue count (TINY_BUFFER + MANAGED_BUFFER + BLOCK)
static constexpr idx_t EVICTION_QUEUES =
TINY_BUFFER_EVICTION_QUEUES + MANAGED_BUFFER_EVICTION_QUEUES + BLOCK_EVICTION_QUEUES;

protected:
enum class MemoryUsageCaches {
Expand Down
40 changes: 25 additions & 15 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,38 @@ bool BufferPool::AddToEvictionQueue(shared_ptr<BlockHandle> &handle) {
}

EvictionQueue &BufferPool::GetEvictionQueueForBlockHandle(const BlockHandle &handle) {
// We go from the back, evicting persistent data first.
// Then, temporary data (by index)
// Finally, tiny buffers (desperation)
idx_t index;
// Obtain offset and number of queues for the FileBufferType
idx_t offset;
idx_t size;
switch (handle.buffer->type) {
case FileBufferType::BLOCK:
index = EVICTION_QUEUES - 1;
case FileBufferType::TINY_BUFFER:
// TINY_BUFFER starts at offset 0 (evicted last)
offset = 0;
size = TINY_BUFFER_EVICTION_QUEUES;
break;
case FileBufferType::MANAGED_BUFFER:
if (!handle.managed_buffer_eviction_queue_index.IsValid()) {
index = EVICTION_QUEUES - 2; // Not set, assume low priority
} else {
index = MinValue(handle.managed_buffer_eviction_queue_index.GetIndex() + 1, EVICTION_QUEUES - 2);
}
// Followed by MANAGED_BUFFER
offset = TINY_BUFFER_EVICTION_QUEUES;
size = MANAGED_BUFFER_EVICTION_QUEUES;
break;
case FileBufferType::TINY_BUFFER:
index = 0;
case FileBufferType::BLOCK:
// Followed by BLOCK (evicted first)
offset = TINY_BUFFER_EVICTION_QUEUES + MANAGED_BUFFER_EVICTION_QUEUES;
size = BLOCK_EVICTION_QUEUES;
break;
default:
throw InternalException("Invalid FileBufferType in BufferPool::GetEvictionQueueForBlockHandle");
}
return *queues[index];

idx_t index;
if (handle.eviction_queue_idx.IsValid()) { // Index was set, bound it by the size
index = MinValue(handle.eviction_queue_idx.GetIndex(), size - 1);
} else { // Index was not set, assume low priority (back of queue)
index = size - 1;
}
D_ASSERT(index < size);

return *queues[offset + index];
}

void BufferPool::IncrementDeadNodes(const BlockHandle &handle) {
Expand Down Expand Up @@ -290,7 +300,7 @@ BufferPool::EvictionResult BufferPool::EvictBlocks(MemoryTag tag, idx_t extra_me
return block_result;
}
}
// This can never happen since we always return when i == 1 - Exception to silence compiler warning
// This can never happen since we always return when i == 1. Exception to silence compiler warning
throw InternalException("Exited BufferPool::EvictBlocksInternal without obtaining BufferPool::EvictionResult");
}

Expand Down

0 comments on commit db905a4

Please sign in to comment.