Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support #1192

Merged
merged 35 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8ce9bb5
Implement native decoding and decompression
andygrove Dec 22, 2024
a9a0593
revert some variable renaming for smaller diff
andygrove Dec 22, 2024
11320a5
fix oom issues?
andygrove Dec 22, 2024
e2f28f9
upmerge
andygrove Dec 22, 2024
c97eb58
make NativeBatchDecoderIterator more consistent with ArrowReaderIterator
andygrove Dec 23, 2024
4ffe47d
fix oom and prep for review
andygrove Dec 23, 2024
68d2331
format
andygrove Dec 23, 2024
a3fb105
Add LZ4 support
andygrove Dec 23, 2024
b593e80
clippy, new benchmark
andygrove Dec 23, 2024
4078551
rename metrics, clean up lz4 code
andygrove Dec 27, 2024
f286309
update test
andygrove Dec 27, 2024
fbc2124
Add support for snappy
andygrove Dec 27, 2024
bed543a
format
andygrove Dec 27, 2024
e13d72f
change default back to lz4
andygrove Dec 27, 2024
f1ed927
make metrics more accurate
andygrove Dec 27, 2024
45b020e
format
andygrove Dec 27, 2024
587feee
clippy
andygrove Dec 27, 2024
ea03a3d
upmerge
andygrove Dec 28, 2024
b56089b
upmerge
andygrove Dec 28, 2024
f66bced
use faster unsafe version of lz4_flex
andygrove Dec 28, 2024
0b2f0e9
Make compression codec configurable for columnar shuffle
andygrove Jan 1, 2025
ad1adc1
clippy
andygrove Jan 1, 2025
3e15b12
fix bench
andygrove Jan 1, 2025
1c08a4b
fmt
andygrove Jan 1, 2025
7dd2ff6
address feedback
andygrove Jan 2, 2025
1fc3d49
address feedback
andygrove Jan 2, 2025
5a3fb2e
address feedback
andygrove Jan 2, 2025
2b88bbd
minor code simplification
andygrove Jan 2, 2025
b4b6aff
upmerge
andygrove Jan 3, 2025
78340a1
cargo fmt
andygrove Jan 3, 2025
69b54d9
overflow check
andygrove Jan 3, 2025
f41180d
rename compression level config
andygrove Jan 4, 2025
9f176c9
address feedback
andygrove Jan 6, 2025
dd4f259
address feedback
andygrove Jan 6, 2025
7e4ddc9
rename constant
andygrove Jan 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,19 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
"Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd"))
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
"spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4", "snappy"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
.doc("The compression level to use when compression shuffle files.")
val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
.doc("The compression level to use when compressing shuffle files with zstd.")
.intConf
.createWithDefault(1)

Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| spark.comet.exec.shuffle.compression.zstd.level | The compression level to use when compressing shuffle files with zstd. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
48 changes: 25 additions & 23 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ serde = { version = "1", features = ["derive"] }
lazy_static = "1.4.0"
prost = "0.12.1"
jni = "0.21"
snap = "1.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome, its really challenging to find a well maintained snappy Rust library.

# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation
lz4_flex = { version = "0.11.3", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat confusingly named feature flag. Can we leave a comment here that we're enabling unsafe encode and decode for performance?

zstd = "0.11"
rand = { workspace = true}
num = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::datatypes::DataType as ArrowDataType;
use comet::execution::shuffle::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
use comet::execution::shuffle::CompressionCodec;
use criterion::{criterion_group, criterion_main, Criterion};
use tempfile::Builder;

Expand Down Expand Up @@ -77,6 +78,7 @@ fn benchmark(c: &mut Criterion) {
false,
0,
None,
&CompressionCodec::Zstd(1),
)
.unwrap();
});
Expand Down
41 changes: 35 additions & 6 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,52 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (snappy)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Snappy, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
Copy link

@Dandandan Dandandan Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think zstd should have faster negative levels as well (-4 or -5 might come close), would be interesting to see how it compares. Not sure if it is available in the rust bindings.

});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time));
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
});
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
Expand Down
Loading
Loading