-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support #1192
Conversation
spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Outdated
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1192 +/- ##
============================================
+ Coverage 34.06% 34.71% +0.64%
- Complexity 925 958 +33
============================================
Files 115 115
Lines 43569 43614 +45
Branches 9528 9534 +6
============================================
+ Hits 14843 15141 +298
+ Misses 25777 25507 -270
- Partials 2949 2966 +17 ☔ View full report in Codecov by Sentry. |
577880a
to
8ce9bb5
Compare
The status is that this is working, but I am running into some executor OOMs when trying to run complete benchmarks. I will pick this up again after the holidays. |
@Dandandan you may be interested in the benchmark results |
b.iter(|| { | ||
buffer.clear(); | ||
let mut cursor = Cursor::new(&mut buffer); | ||
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think zstd should have faster negative levels as well (-4 or -5 might come close), would be interesting to see how it compares. Not sure if it is available in the rust bindings.
Added Snappy.
|
@viirya @kazuyukitanimura @parthchandra @mbutrovich This is ready for review now |
@@ -52,6 +52,8 @@ serde = { version = "1", features = ["derive"] } | |||
lazy_static = "1.4.0" | |||
prost = "0.12.1" | |||
jni = "0.21" | |||
snap = "1.1" | |||
lz4_flex = { version = "0.11.3", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a somewhat confusingly named feature flag. Can we leave a comment here that we're enabling unsafe encode and decode for performance?
let batch = create_batch(8192); | ||
let mut output = vec![]; | ||
let mut cursor = Cursor::new(&mut output); | ||
write_ipc_compressed( | ||
let length = write_ipc_compressed( | ||
&batch, | ||
&mut cursor, | ||
&CompressionCodec::Zstd(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we modify this test so instead of being named round_trip_zstd
it iterates through all the compression schemes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I have implemented this in 1fc3d49 and also fixed the bug that it exposed 🙂
@@ -1570,13 +1588,41 @@ pub fn write_ipc_compressed<W: Write + Seek>( | |||
// write ipc_length placeholder | |||
output.write_all(&[0u8; 8])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this isn't your code, but it jumped out at me: can we replace this 8-byte write with a seek?
// seek past ipc_length placeholder
output.seek_relative(8)?;
@@ -52,6 +52,9 @@ serde = { version = "1", features = ["derive"] } | |||
lazy_static = "1.4.0" | |||
prost = "0.12.1" | |||
jni = "0.21" | |||
snap = "1.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome, its really challenging to find a well maintained snappy Rust library.
CompressionCodec::Lz4Frame => { | ||
output.write_all(b"LZ4_")?; | ||
let mut wtr = lz4_flex::frame::FrameEncoder::new(output); | ||
let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably dumb question, looks like the writers gets created for every input.
In real life example it will be recreated for every batch
, probably the writer can be reused until the data stream has completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark's approach is to write shuffled output data in blocks where each block is independent (contains schema + data for that batch), so it doesn't allow for a streaming approach. We accumulate data in the shuffle writer until we reach the configured batch size and then write that batch out to bytes. There is also no guarantee of the order in which the shuffle reader will read these blocks, so that is another reason why we cannot keep a writer open for multiple batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think it makes any sense if I play with reusable writers to check if it gives any benefit on multiple bacthes? I feel we can have a map of it where key is schema hash + compression algorithm. During real job runtime its probably not expected to have more than 50-100 writers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to experiment, and let's see what else we can learn. I'm not sure exactly what you are planning on trying but it seems that we could just have one writer per PartitionBuffer
and write multiple batches to it. It may complicate the spill logic but you can just disable that for testing the idea. I wonder how this would differ from just increasing the shuffle batch size so that we write larger batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, thats another way to increase batch_size
so the compression writer will be created less often.
The test idea was to find out how expensive is the writer creation, to see if this can be an issue.
Perhaps I can just try to create a bench and quantify it so it gives us some ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I found is
Time elapsed in writer_create is: 107.125µs
Time elapsed in total is: 1.699042ms
The average for creation is 7% of overall time. However to make writer a singleton is not a trivial task...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the PR that follows this one (#1190) I propose replacing ArrowWriter
with a faster proprietary version.
...src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
Outdated
Show resolved
Hide resolved
@viirya @kazuyukitanimura @mbutrovich @comphead Thanks for the reviews so far. I believe I have addressed all feedback now. |
...src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove
"spark.shuffle.compress=false.") | ||
.stringConf | ||
.checkValues(Set("zstd", "lz4", "snappy")) | ||
.createWithDefault("lz4") | ||
|
||
val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit since the config name now has zstd
, the constant name should ideally reflect it, but optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @andygrove mentioned this PR on the sync up call today
I was thinking that a building block to quickly write arrow data to/from disk/SSD would be a valuable feature for many systems in addition to Comet -- for example I think it could significantly improve performance when data is spilled in sorting / grouping.
I wonder if any thought has gone into this? I would be happy to help organize a project upstream in DataFusion to creat this building block but didn't want to start that discussion until I asked over here first
@alamb Here is the follow on PR for the custom encoding to replace Arrow IPC: It would definitely be good to collaborate on getting some version of this implemented upstream. I also filed an issue to explore using interleave_record_batch (I did not know that this existed). |
THank you -- to follow up here I also filed a ticket to consider adding a better spill format in datafusion: |
Which issue does this PR close?
Part of #1123
Closes #1178
Rationale for this change
We currently perform encoding + compression in native code and decoding + decompression in JVM code. There are some downsides to this approach:
What changes are included in this PR?
ZSTD
LZ4
Microbenchmarks
TPC-H
edit: Results update on 1/1/2025 after making compression configurable for columnar shuffle as well as native shuffle.
How are these changes tested?
Existing tests