Skip to content

Commit

Permalink
Temp directory compression (duckdb#14465)
Browse files Browse the repository at this point in the history
This PR implements compression for the temporary buffers that DuckDB
swaps in and out of files in `temp_directory`.

The temporary buffers are compressed with ZSTD (with compression level
-3, -1, 1, or 3) ) _or stored uncompressed_, which is chosen adaptively.
The adaptivity is really simple, as we store the last total write time
(or compress + write time) and choose whatever was the fastest
previously (with a slight bias towards compression, as reducing the temp
directory size is always beneficial), with a small chance to deviate
from this, so that we don't get stuck doing the same thing forever.

Whether we compress or not, and at which compression level really needs
to be adaptive; otherwise, we degrade performance in situations where
writing is cheap, e.g., when not many concurrent writes (to an SSD) are
going on at the same time. I have performed two simple benchmarks on my
laptop:

```sql
.timer on
set memory_limit='100mb';
set preserve_insertion_order=false;
create or replace table test as select random()::varchar i from range(50_000_000); -- Q1
create or replace table test2 as select * from test; -- Q2
```

Q1 is a single-threaded write (because `range` is a single-threaded
table function), and Q2 is a multi-threaded read/write. Here are the
median runtimes over 5 runs:

| Query | DuckDB 1.1.2 | This PR |
|--:|--:|--:|
| Q1 | 7.107s | __5.845s__ |
| Q2 | __0.346s__ | 0.380s |

As we can see, Q1 is significantly faster. Meanwhile, Q2 is only
slightly slower. The difference in size is minimal (2.3GB vs 2.4GB).

The next benchmark is a large out-of-core aggregation:
```sql
use tpch_sf1000;
set memory_limit='32gb';
.timer on
pragma tpch(18);
```

| DuckDB 1.1.2 | This PR |
|--:|--:|
| 65.524 | __59.074__ |

Note that there is some fluctuation in performance due to my laptop
running some stuff in the background, but the compression also seems to
improve performance here. This time, the size difference is a bit more
pronounced. In DuckDB 1.1.2, the size of the temp directory was 38-39GB.
With this PR, the size was 33-36GB. If disk speeds are slower, more
blocks will be compressed with a higher compression level, which should
reduce the temp directory size more.

Our uncompressed fixed-size blocks are still swapped in and out of a
file that stores 256KiB blocks. Our compressed blocks can have different
sizes, and we create one or more files per "size class", i.e., a
multiple of 32KiB.
  • Loading branch information
Mytherin authored Oct 31, 2024
2 parents 7fb69a4 + d1a3349 commit de91c64
Show file tree
Hide file tree
Showing 33 changed files with 870 additions and 368 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ include_directories(third_party/tdigest)
include_directories(third_party/mbedtls/include)
include_directories(third_party/jaro_winkler)
include_directories(third_party/yyjson/include)
include_directories(third_party/zstd/include)

# todo only regenerate ub file if one of the input files changed hack alert
function(enable_unity_build UB_SUFFIX SOURCE_VARIABLE_NAME)
Expand Down
39 changes: 7 additions & 32 deletions extension/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@ cmake_minimum_required(VERSION 3.5...3.29)
project(ParquetExtension)

include_directories(
include
../../third_party/lz4
../../third_party/parquet
../../third_party/thrift
../../third_party/snappy
../../third_party/zstd/include
../../third_party/mbedtls
../../third_party/mbedtls/include
include ../../third_party/lz4 ../../third_party/parquet
../../third_party/thrift ../../third_party/snappy
../../third_party/brotli/include)

set(PARQUET_EXTENSION_FILES
Expand All @@ -37,31 +31,12 @@ if(NOT CLANG_TIDY)
../../third_party/thrift/thrift/transport/TBufferTransports.cpp
../../third_party/snappy/snappy.cc
../../third_party/snappy/snappy-sinksource.cc)
# zstd
# lz4
set(PARQUET_EXTENSION_FILES ${PARQUET_EXTENSION_FILES}
../../third_party/lz4/lz4.cpp)
# brotli
set(PARQUET_EXTENSION_FILES
${PARQUET_EXTENSION_FILES}
../../third_party/lz4/lz4.cpp
../../third_party/zstd/decompress/zstd_ddict.cpp
../../third_party/zstd/decompress/huf_decompress.cpp
../../third_party/zstd/decompress/zstd_decompress.cpp
../../third_party/zstd/decompress/zstd_decompress_block.cpp
../../third_party/zstd/common/entropy_common.cpp
../../third_party/zstd/common/fse_decompress.cpp
../../third_party/zstd/common/zstd_common.cpp
../../third_party/zstd/common/error_private.cpp
../../third_party/zstd/common/xxhash.cpp
../../third_party/zstd/compress/fse_compress.cpp
../../third_party/zstd/compress/hist.cpp
../../third_party/zstd/compress/huf_compress.cpp
../../third_party/zstd/compress/zstd_compress.cpp
../../third_party/zstd/compress/zstd_compress_literals.cpp
../../third_party/zstd/compress/zstd_compress_sequences.cpp
../../third_party/zstd/compress/zstd_compress_superblock.cpp
../../third_party/zstd/compress/zstd_double_fast.cpp
../../third_party/zstd/compress/zstd_fast.cpp
../../third_party/zstd/compress/zstd_lazy.cpp
../../third_party/zstd/compress/zstd_ldm.cpp
../../third_party/zstd/compress/zstd_opt.cpp
../../third_party/brotli/enc/dictionary_hash.cpp
../../third_party/brotli/enc/backward_references_hq.cpp
../../third_party/brotli/enc/histogram.cpp
Expand Down Expand Up @@ -98,7 +73,7 @@ endif()
build_static_extension(parquet ${PARQUET_EXTENSION_FILES})
set(PARAMETERS "-warnings")
build_loadable_extension(parquet ${PARAMETERS} ${PARQUET_EXTENSION_FILES})
target_link_libraries(parquet_loadable_extension duckdb_mbedtls)
target_link_libraries(parquet_loadable_extension duckdb_mbedtls duckdb_zstd)

install(
TARGETS parquet_extension
Expand Down
11 changes: 3 additions & 8 deletions extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,11 @@ void ColumnWriter::CompressPage(MemoryStream &temp_writer, size_t &compressed_si
break;
}
case CompressionCodec::ZSTD: {
auto configured_compression = writer.CompressionLevel();
int compress_level = ZSTD_CLEVEL_DEFAULT;
if (configured_compression.IsValid()) {
compress_level = static_cast<int>(configured_compression.GetIndex());
}
compressed_size = duckdb_zstd::ZSTD_compressBound(temp_writer.GetPosition());
compressed_buf = unique_ptr<data_t[]>(new data_t[compressed_size]);
compressed_size =
duckdb_zstd::ZSTD_compress((void *)compressed_buf.get(), compressed_size,
(const void *)temp_writer.GetData(), temp_writer.GetPosition(), compress_level);
compressed_size = duckdb_zstd::ZSTD_compress((void *)compressed_buf.get(), compressed_size,
(const void *)temp_writer.GetData(), temp_writer.GetPosition(),
writer.CompressionLevel());
compressed_data = compressed_buf.get();
break;
}
Expand Down
6 changes: 3 additions & 3 deletions extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ParquetWriter {
vector<string> names, duckdb_parquet::CompressionCodec::type codec, ChildFieldIDs field_ids,
const vector<pair<string, string>> &kv_metadata,
shared_ptr<ParquetEncryptionConfig> encryption_config, double dictionary_compression_ratio_threshold,
optional_idx compression_level, bool debug_use_openssl);
int64_t compression_level, bool debug_use_openssl);

public:
void PrepareRowGroup(ColumnDataCollection &buffer, PreparedRowGroup &result);
Expand Down Expand Up @@ -100,7 +100,7 @@ class ParquetWriter {
double DictionaryCompressionRatioThreshold() const {
return dictionary_compression_ratio_threshold;
}
optional_idx CompressionLevel() const {
int64_t CompressionLevel() const {
return compression_level;
}
idx_t NumberOfRowGroups() {
Expand All @@ -124,7 +124,7 @@ class ParquetWriter {
ChildFieldIDs field_ids;
shared_ptr<ParquetEncryptionConfig> encryption_config;
double dictionary_compression_ratio_threshold;
optional_idx compression_level;
int64_t compression_level;
bool debug_use_openssl;
shared_ptr<EncryptionUtil> encryption_util;

Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/include/zstd_file_system.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ZStdFileSystem : public CompressedFileSystem {
unique_ptr<StreamWrapper> CreateStream() override;
idx_t InBufferSize() override;
idx_t OutBufferSize() override;

static int64_t DefaultCompressionLevel();
static int64_t MinimumCompressionLevel();
static int64_t MaximumCompressionLevel();
};

} // namespace duckdb
29 changes: 1 addition & 28 deletions extension/parquet/parquet_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
'third_party/brotli/dec',
'third_party/brotli/enc',
'third_party/snappy',
'third_party/zstd/include',
'third_party/mbedtls',
'third_party/mbedtls/include',
'third_party/zstd/include',
]
]
# source files
Expand Down Expand Up @@ -48,33 +48,6 @@
'third_party/snappy/snappy-sinksource.cc',
]
]
# zstd
source_files += [
os.path.sep.join(x.split('/'))
for x in [
'third_party/zstd/decompress/zstd_ddict.cpp',
'third_party/zstd/decompress/huf_decompress.cpp',
'third_party/zstd/decompress/zstd_decompress.cpp',
'third_party/zstd/decompress/zstd_decompress_block.cpp',
'third_party/zstd/common/entropy_common.cpp',
'third_party/zstd/common/fse_decompress.cpp',
'third_party/zstd/common/zstd_common.cpp',
'third_party/zstd/common/error_private.cpp',
'third_party/zstd/common/xxhash.cpp',
'third_party/zstd/compress/fse_compress.cpp',
'third_party/zstd/compress/hist.cpp',
'third_party/zstd/compress/huf_compress.cpp',
'third_party/zstd/compress/zstd_compress.cpp',
'third_party/zstd/compress/zstd_compress_literals.cpp',
'third_party/zstd/compress/zstd_compress_sequences.cpp',
'third_party/zstd/compress/zstd_compress_superblock.cpp',
'third_party/zstd/compress/zstd_double_fast.cpp',
'third_party/zstd/compress/zstd_fast.cpp',
'third_party/zstd/compress/zstd_lazy.cpp',
'third_party/zstd/compress/zstd_ldm.cpp',
'third_party/zstd/compress/zstd_opt.cpp',
]
]
# lz4
source_files += [os.path.sep.join(x.split('/')) for x in ['third_party/lz4/lz4.cpp']]

Expand Down
42 changes: 38 additions & 4 deletions extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ struct ParquetWriteBindData : public TableFunctionData {

ChildFieldIDs field_ids;
//! The compression level, higher value is more
optional_idx compression_level;
int64_t compression_level = ZStdFileSystem::DefaultCompressionLevel();
};

struct ParquetWriteGlobalState : public GlobalFunctionData {
Expand Down Expand Up @@ -1200,6 +1200,7 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBi
const vector<string> &names, const vector<LogicalType> &sql_types) {
D_ASSERT(names.size() == sql_types.size());
bool row_group_size_bytes_set = false;
bool compression_level_set = false;
auto bind_data = make_uniq<ParquetWriteBindData>();
for (auto &option : input.info.options) {
const auto loption = StringUtil::Lower(option.first);
Expand Down Expand Up @@ -1295,7 +1296,14 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBi
throw BinderException("Expected debug_use_openssl to be a BOOLEAN");
}
} else if (loption == "compression_level") {
bind_data->compression_level = option.second[0].GetValue<uint64_t>();
const auto val = option.second[0].GetValue<int64_t>();
if (val < ZStdFileSystem::MinimumCompressionLevel() || val > ZStdFileSystem::MaximumCompressionLevel()) {
throw BinderException("Compression level must be between %lld and %lld",
ZStdFileSystem::MinimumCompressionLevel(),
ZStdFileSystem::MaximumCompressionLevel());
}
bind_data->compression_level = val;
compression_level_set = true;
} else {
throw NotImplementedException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
Expand All @@ -1310,6 +1318,10 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBi
bind_data->row_group_size_bytes = bind_data->row_group_size * ParquetWriteBindData::BYTES_PER_ROW;
}

if (compression_level_set && bind_data->codec != CompressionCodec::ZSTD) {
throw BinderException("Compression level is only supported for the ZSTD compression codec");
}

bind_data->sql_types = sql_types;
bind_data->column_names = names;
return std::move(bind_data);
Expand Down Expand Up @@ -1430,6 +1442,23 @@ duckdb_parquet::CompressionCodec::type EnumUtil::FromString<duckdb_parquet::Comp
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

static optional_idx SerializeCompressionLevel(const int64_t compression_level) {
return compression_level < 0 ? NumericLimits<idx_t>::Maximum() - NumericCast<idx_t>(AbsValue(compression_level))
: NumericCast<idx_t>(compression_level);
}

static int64_t DeserializeCompressionLevel(const optional_idx compression_level) {
// Was originally an optional_idx, now int64_t, so we still serialize as such
if (!compression_level.IsValid()) {
return ZStdFileSystem::DefaultCompressionLevel();
}
if (compression_level.GetIndex() > NumericCast<idx_t>(ZStdFileSystem::MaximumCompressionLevel())) {
// restore the negative compression level
return -NumericCast<int64_t>(NumericLimits<idx_t>::Maximum() - compression_level.GetIndex());
}
return NumericCast<int64_t>(compression_level.GetIndex());
}

static void ParquetCopySerialize(Serializer &serializer, const FunctionData &bind_data_p,
const CopyFunction &function) {
auto &bind_data = bind_data_p.Cast<ParquetWriteBindData>();
Expand All @@ -1444,7 +1473,9 @@ static void ParquetCopySerialize(Serializer &serializer, const FunctionData &bin
bind_data.encryption_config, nullptr);
serializer.WriteProperty(108, "dictionary_compression_ratio_threshold",
bind_data.dictionary_compression_ratio_threshold);
serializer.WritePropertyWithDefault<optional_idx>(109, "compression_level", bind_data.compression_level);
const auto compression_level = SerializeCompressionLevel(bind_data.compression_level);
D_ASSERT(DeserializeCompressionLevel(compression_level) == bind_data.compression_level);
serializer.WritePropertyWithDefault<optional_idx>(109, "compression_level", compression_level);
serializer.WriteProperty(110, "row_groups_per_file", bind_data.row_groups_per_file);
serializer.WriteProperty(111, "debug_use_openssl", bind_data.debug_use_openssl);
}
Expand All @@ -1462,7 +1493,10 @@ static unique_ptr<FunctionData> ParquetCopyDeserialize(Deserializer &deserialize
data->encryption_config, nullptr);
deserializer.ReadPropertyWithExplicitDefault<double>(108, "dictionary_compression_ratio_threshold",
data->dictionary_compression_ratio_threshold, 1.0);
deserializer.ReadPropertyWithDefault<optional_idx>(109, "compression_level", data->compression_level);
optional_idx compression_level;
deserializer.ReadPropertyWithDefault<optional_idx>(109, "compression_level", compression_level);
data->compression_level = DeserializeCompressionLevel(compression_level);
D_ASSERT(SerializeCompressionLevel(data->compression_level) == compression_level);
data->row_groups_per_file =
deserializer.ReadPropertyWithExplicitDefault<optional_idx>(110, "row_groups_per_file", optional_idx::Invalid());
data->debug_use_openssl = deserializer.ReadPropertyWithExplicitDefault<bool>(111, "debug_use_openssl", true);
Expand Down
17 changes: 2 additions & 15 deletions extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
vector<string> names_p, CompressionCodec::type codec, ChildFieldIDs field_ids_p,
const vector<pair<string, string>> &kv_metadata,
shared_ptr<ParquetEncryptionConfig> encryption_config_p,
double dictionary_compression_ratio_threshold_p, optional_idx compression_level_p,
double dictionary_compression_ratio_threshold_p, int64_t compression_level_p,
bool debug_use_openssl_p)
: file_name(std::move(file_name_p)), sql_types(std::move(types_p)), column_names(std::move(names_p)), codec(codec),
field_ids(std::move(field_ids_p)), encryption_config(std::move(encryption_config_p)),
dictionary_compression_ratio_threshold(dictionary_compression_ratio_threshold_p),
debug_use_openssl(debug_use_openssl_p) {
compression_level(compression_level_p), debug_use_openssl(debug_use_openssl_p) {
// initialize the file writer
writer = make_uniq<BufferedFileWriter>(fs, file_name.c_str(),
FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE_NEW);
Expand Down Expand Up @@ -361,19 +361,6 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
file_meta_data.key_value_metadata.push_back(kv);
file_meta_data.__isset.key_value_metadata = true;
}
if (compression_level_p.IsValid()) {
idx_t level = compression_level_p.GetIndex();
switch (codec) {
case CompressionCodec::ZSTD:
if (level < 1 || level > 22) {
throw BinderException("Compression level for ZSTD must be between 1 and 22");
}
break;
default:
throw NotImplementedException("Compression level is only supported for the ZSTD compression codec");
}
compression_level = level;
}

// populate root schema object
file_meta_data.schema[0].name = "duckdb_schema";
Expand Down
13 changes: 13 additions & 0 deletions extension/parquet/zstd_file_system.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "zstd_file_system.hpp"

#include "zstd.h"

namespace duckdb {
Expand Down Expand Up @@ -184,4 +185,16 @@ idx_t ZStdFileSystem::OutBufferSize() {
return duckdb_zstd::ZSTD_DStreamOutSize();
}

int64_t ZStdFileSystem::DefaultCompressionLevel() {
return duckdb_zstd::ZSTD_defaultCLevel();
}

int64_t ZStdFileSystem::MinimumCompressionLevel() {
return duckdb_zstd::ZSTD_minCLevel();
}

int64_t ZStdFileSystem::MaximumCompressionLevel() {
return duckdb_zstd::ZSTD_maxCLevel();
}

} // namespace duckdb
4 changes: 2 additions & 2 deletions scripts/generate_enum_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def remove_prefix(str, prefix):
enum_type = res.group(2)

enum_members = []
# Capture All members: \w+(\s*\=\s*\w*)?
# Capture All members: \w+(\s*\=\s*-?\w*)?
# group one is the member name
# group two is the member value
# First clean group from comments
Expand All @@ -90,7 +90,7 @@ def remove_prefix(str, prefix):
s = re.sub(r"\/\*.*\*\/", "", s)

enum_values = {}
for member in re.finditer(r"(\w+)(\s*\=\s*\w*)?", s):
for member in re.finditer(r"(\w+)(\s*\=\s*-?\w*)?", s):
key = member.group(1)
strings = [key]
if enum_name in overrides and key in overrides[enum_name]:
Expand Down
2 changes: 2 additions & 0 deletions scripts/package_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def third_party_includes():
includes += [os.path.join('third_party', 'utf8proc')]
includes += [os.path.join('third_party', 'utf8proc', 'include')]
includes += [os.path.join('third_party', 'yyjson', 'include')]
includes += [os.path.join('third_party', 'zstd', 'include')]
return includes


Expand All @@ -53,6 +54,7 @@ def third_party_sources():
sources += [os.path.join('third_party', 'libpg_query')]
sources += [os.path.join('third_party', 'mbedtls')]
sources += [os.path.join('third_party', 'yyjson')]
sources += [os.path.join('third_party', 'zstd')]
return sources


Expand Down
Loading

0 comments on commit de91c64

Please sign in to comment.