Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement custom RecordBatch serde for shuffle for improved performance #1190

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 20, 2024

Which issue does this PR close?

Closes #1189

Rationale for this change

Arrow IPC is a good general purpose serde framework but we can get better performance by implementing specialized code optimized for Comet, which encodes single batches to shuffle blocks.

This PR implements a new BatchWriter and BatchReader and updates shuffle writer to use them when possible (when all data types are supported), falling back to Arrow IPC for other cases.

Specializations include:

  • The schema gets encoded to bytes just once per shuffle operation rather than once per batch. The encoded schema bytes are then written out directly with each shuffle block, avoiding the schema serde cost per batch.
  • Raw data, offset, and null buffers are written out directly with no flatbuffer encoding, no alignment, and no metadata

TPC-H

Based on median of 5 runs I am seeing a 2-3% speedup.

tpch_allqueries

tpch_queries_compare

Microbenchmarks

Without compression:

shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=true, compression=None)
                        time:   [6.7237 µs 6.7697 µs 6.8052 µs]
shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=false, compression=None)
                        time:   [16.679 µs 16.803 µs 16.922 µs]

With LZ4 compression:

shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=true, compression=Lz4Frame)
                        time:   [173.49 µs 176.76 µs 180.52 µs]
shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=false, compression=Lz4Frame)
                        time:   [202.86 µs 204.07 µs 205.25 µs]

With ZSTD compression:

shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=true, compression=Zstd(1))
                        time:   [331.70 µs 333.24 µs 334.73 µs]
shuffle_writer/shuffle_writer: write encoded (enable_fast_encoding=false, compression=Zstd(1))
                        time:   [339.05 µs 340.44 µs 341.91 µs]

Benchmark Results

What changes are included in this PR?

How are these changes tested?

Existing tests + updated unit tests.

@andygrove andygrove force-pushed the experimental-fast-batch-serde branch from 49d0c27 to f7d8cce Compare December 20, 2024 17:07
use std::io::Write;
use std::sync::Arc;

pub fn write_batch_fast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to end up implementing a form of arrow (stream) IPC?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that we may be able to just use https://docs.rs/arrow-ipc/latest/arrow_ipc/writer/struct.IpcDataGenerator.html#method.encoded_batch and am going to look into that next

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to end up implementing a form of arrow (stream) IPC?

Yes, but without using flatbuffers to align and encode anything, just the raw bytes, and without the metadata messages.


fn create_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Utf8, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove interesting if other datatypes keep the same performance benefit

@andygrove andygrove changed the title feat: Implement fast serde for single record batches [do not review] feat: Implement fast serde for single record batches Dec 21, 2024
@andygrove andygrove force-pushed the experimental-fast-batch-serde branch from e32db8d to c2b9e5c Compare December 28, 2024 15:35
@andygrove andygrove changed the title [do not review] feat: Implement fast serde for single record batches feat: Implement fast serde for single record batches Dec 28, 2024
@andygrove andygrove changed the title feat: Implement fast serde for single record batches feat: Implement custom RecordBatch serde for shuffle for improved performance Dec 30, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 30, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 34.67%. Comparing base (be48839) to head (7c93806).
Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1190      +/-   ##
============================================
- Coverage     34.69%   34.67%   -0.02%     
+ Complexity      991      987       -4     
============================================
  Files           116      116              
  Lines         44891    44883       -8     
  Branches       9864     9884      +20     
============================================
- Hits          15574    15565       -9     
+ Misses        26168    26155      -13     
- Partials       3149     3163      +14     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove force-pushed the experimental-fast-batch-serde branch from 96a87bd to b9b3d9d Compare January 8, 2025 01:12
@andygrove andygrove requested review from comphead, kazuyukitanimura, parthchandra and viirya and removed request for parthchandra January 8, 2025 01:14
@comphead
Copy link
Contributor

Thanks @andygrove thats amazing, I suppose it is extensible to support nested types in future?

@andygrove
Copy link
Member Author

Thanks @andygrove thats amazing, I suppose it is extensible to support nested types in future?

Yes, I don't foresee any issues with that.

@alamb
Copy link
Contributor

alamb commented Jan 10, 2025

FYI I filed a ticket in DataFusion to consider adding this code (or something similar) upstream:

I think it would help other serialization usecases (not just the shuffle reader/writer) -- for example sorting and grouping

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @andygrove

for compression_codec in [
CompressionCodec::None,
CompressionCodec::Lz4Frame,
CompressionCodec::Zstd(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/optional: snappy and zstd(6) here as well?

let null_buffer = self.read_null_buffer();
Arc::new(BinaryArray::try_new(offset_buffer, buffer, null_buffer)?)
}
DataType::Dictionary(_, _) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safer, maybe add Int32Type to the key type for this pattern.

@tustvold
Copy link

tustvold commented Jan 11, 2025

FWIW this encoding format is almost identical to the IPC format AFAICT with only some minor changes to the metadata encoding. The exact same validation is done when reading an array, and both preserve the buffers as is1..

I suspect that much of the overhead is fixed overheads in StreamWriter, e.g. encoding the schema, and that these could be optimised and/or eliminated by using the lower-level APIs such as write_message and root_as_message. The benchmarks at least appear to agree with this, with a relatively fixed performance delta on the order of 10s of microseconds between the two encoders.

Just thought I'd put it out there, there is likely low hanging fruit in the arrow-rs IPC implementation and improvements there would definitely be welcome not just by myself.

1. This is actually not entirely true, when dealing with sliced arrays, the arrow IPC implementation has additional logic to avoid encoding data that isn't referenced

DataType::Utf8 => {
let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
// write data buffer
self.write_buffer(arr.values())?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI if the array is sliced this will write the full data buffer, as the slice is not "materialized" into the buffer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement faster single batch encoding/decoding for use in shuffle
8 participants