Skip to content

Commit

Permalink
allocator_bulk_deallocation_flush_threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Sep 6, 2024
1 parent d454d24 commit bb54611
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 9 deletions.
6 changes: 4 additions & 2 deletions src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ struct DBConfigOptions {
static bool debug_print_bindings; // NOLINT: debug setting
//! The peak allocation threshold at which to flush the allocator after completing a task (1 << 27, ~128MB)
idx_t allocator_flush_threshold = 134217728;
//! If bulk deallocation larger than this occurs, flush outstanding allocations (1 << 30, ~1GB)
idx_t allocator_bulk_deallocation_flush_threshold = 1073741824;
//! Whether the allocator background thread is enabled
bool allocator_background_threads = false;
//! DuckDB API surface
Expand All @@ -258,11 +260,11 @@ struct DBConfigOptions {
bool abort_on_wal_failure = false;
//! The index_scan_percentage sets a threshold for index scans.
//! If fewer than MAX(index_scan_max_count, index_scan_percentage * total_row_count)
// rows match, we perform an index scan instead of a table scan.
//! rows match, we perform an index scan instead of a table scan.
double index_scan_percentage = 0.001;
//! The index_scan_max_count sets a threshold for index scans.
//! If fewer than MAX(index_scan_max_count, index_scan_percentage * total_row_count)
// rows match, we perform an index scan instead of a table scan.
//! rows match, we perform an index scan instead of a table scan.
idx_t index_scan_max_count = STANDARD_VECTOR_SIZE;
//! The maximum number of schemas we will look through for "did you mean..." style errors in the catalog
idx_t catalog_error_max_schemas = 100;
Expand Down
12 changes: 11 additions & 1 deletion src/include/duckdb/main/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,17 @@ struct UsernameSetting {
static Value GetSetting(const ClientContext &context);
};

struct FlushAllocatorSetting {
struct AllocatorFlushThreshold {
static constexpr const char *Name = "allocator_bulk_deallocation_flush_threshold";
static constexpr const char *Description =
"If a bulk deallocation larger than this occurs, flush outstanding allocations";
static constexpr const LogicalTypeId InputType = LogicalTypeId::VARCHAR;
static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &parameter);
static void ResetGlobal(DatabaseInstance *db, DBConfig &config);
static Value GetSetting(const ClientContext &context);
};

struct AllocatorBulkDeallocationFlushThreshold {
static constexpr const char *Name = "allocator_flush_threshold";
static constexpr const char *Description =
"Peak allocation threshold at which to flush the allocator after completing a task.";
Expand Down
5 changes: 5 additions & 0 deletions src/include/duckdb/storage/buffer/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class BufferPool {
//! blocks can be evicted
void SetLimit(idx_t limit, const char *exception_postscript);

//! If bulk deallocation larger than this occurs, flush outstanding allocations
void SetAllocatorBulkDeallocationFlushThreshold(idx_t threshold);

void UpdateUsedMemory(MemoryTag tag, int64_t size);

idx_t GetUsedMemory() const;
Expand Down Expand Up @@ -135,6 +138,8 @@ class BufferPool {
mutex limit_lock;
//! The maximum amount of memory that the buffer manager can keep (in bytes)
atomic<idx_t> maximum_memory;
//! If bulk deallocation larger than this occurs, flush outstanding allocations
atomic<idx_t> allocator_bulk_deallocation_flush_threshold;
//! Record timestamps of buffer manager unpin() events. Usable by custom eviction policies.
bool track_eviction_timestamps;
//! Eviction queues
Expand Down
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_GLOBAL_ALIAS("user", UsernameSetting),
DUCKDB_GLOBAL_ALIAS("wal_autocheckpoint", CheckpointThresholdSetting),
DUCKDB_GLOBAL_ALIAS("worker_threads", ThreadsSetting),
DUCKDB_GLOBAL(FlushAllocatorSetting),
DUCKDB_GLOBAL(AllocatorFlushThreshold),
DUCKDB_GLOBAL(AllocatorBackgroundThreadsSetting),
DUCKDB_GLOBAL(DuckDBApiSetting),
DUCKDB_GLOBAL(CustomUserAgentSetting),
Expand Down
34 changes: 30 additions & 4 deletions src/main/settings/settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "duckdb/parallel/task_scheduler.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/planner/expression_binder.hpp"
#include "duckdb/storage/buffer/buffer_pool.hpp"
#include "duckdb/storage/buffer_manager.hpp"
#include "duckdb/storage/storage_manager.hpp"

Expand Down Expand Up @@ -1891,27 +1892,52 @@ Value UsernameSetting::GetSetting(const ClientContext &context) {
//===--------------------------------------------------------------------===//
// Allocator Flush Threshold
//===--------------------------------------------------------------------===//
void FlushAllocatorSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
void AllocatorFlushThreshold::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
config.options.allocator_flush_threshold = DBConfig::ParseMemoryLimit(input.ToString());
if (db) {
TaskScheduler::GetScheduler(*db).SetAllocatorFlushTreshold(config.options.allocator_flush_threshold);
}
}

void FlushAllocatorSetting::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
void AllocatorFlushThreshold::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
config.options.allocator_flush_threshold = DBConfig().options.allocator_flush_threshold;
if (db) {
TaskScheduler::GetScheduler(*db).SetAllocatorFlushTreshold(config.options.allocator_flush_threshold);
}
}

Value FlushAllocatorSetting::GetSetting(const ClientContext &context) {
Value AllocatorFlushThreshold::GetSetting(const ClientContext &context) {
auto &config = DBConfig::GetConfig(context);
return Value(StringUtil::BytesToHumanReadableString(config.options.allocator_flush_threshold));
}

//===--------------------------------------------------------------------===//
// Allocator Background Thread
// Allocator Bulk Deallocation Flush Threshold
//===--------------------------------------------------------------------===//
void AllocatorBulkDeallocationFlushThreshold::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
config.options.allocator_bulk_deallocation_flush_threshold = DBConfig::ParseMemoryLimit(input.ToString());
if (db) {
BufferManager::GetBufferManager(*db).GetBufferPool().SetAllocatorBulkDeallocationFlushThreshold(
config.options.allocator_bulk_deallocation_flush_threshold);
}
}

void AllocatorBulkDeallocationFlushThreshold::ResetGlobal(DatabaseInstance *db, DBConfig &config) {
config.options.allocator_bulk_deallocation_flush_threshold =
DBConfig().options.allocator_bulk_deallocation_flush_threshold;
if (db) {
BufferManager::GetBufferManager(*db).GetBufferPool().SetAllocatorBulkDeallocationFlushThreshold(
config.options.allocator_bulk_deallocation_flush_threshold);
}
}

Value AllocatorBulkDeallocationFlushThreshold::GetSetting(const ClientContext &context) {
auto &config = DBConfig::GetConfig(context);
return Value(StringUtil::BytesToHumanReadableString(config.options.allocator_bulk_deallocation_flush_threshold));
}

//===--------------------------------------------------------------------===//
// Allocator Background Threads
//===--------------------------------------------------------------------===//
void AllocatorBackgroundThreadsSetting::SetGlobal(DatabaseInstance *db, DBConfig &config, const Value &input) {
config.options.allocator_background_threads = input.GetValue<bool>();
Expand Down
6 changes: 6 additions & 0 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ BufferPool::EvictionResult BufferPool::EvictBlocksInternal(EvictionQueue &queue,

if (!found) {
r.Resize(0);
} else if (Allocator::SupportsFlush() && extra_memory > allocator_bulk_deallocation_flush_threshold) {
Allocator::FlushAll();
}

return {found, std::move(r)};
Expand Down Expand Up @@ -401,6 +403,10 @@ void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
}
}

void BufferPool::SetAllocatorBulkDeallocationFlushThreshold(idx_t threshold) {
allocator_bulk_deallocation_flush_threshold = threshold;
}

BufferPool::MemoryUsage::MemoryUsage() {
for (auto &v : memory_usage) {
v = 0;
Expand Down
3 changes: 2 additions & 1 deletion test/api/test_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ OptionValueSet GetValueForOption(const string &name, LogicalTypeId type) {
{"http_proxy_username", {"john"}},
{"http_proxy_password", {"doe"}},
{"http_logging_output", {"my_cool_outputfile"}},
{"allocator_flush_threshold", {"4.0 GiB"}}};
{"allocator_flush_threshold", {"4.0 GiB"}},
{"allocator_bulk_deallocation_flush_threshold", {"4.0 GiB"}}};
// Every option that's not excluded has to be part of this map
if (!value_map.count(name)) {
switch (type) {
Expand Down

0 comments on commit bb54611

Please sign in to comment.