Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement custom RecordBatch serde for shuffle for improved performance #1190

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(1)

val COMET_SHUFFLE_ENABLE_FAST_ENCODING: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enableFastEncoding")
.doc("Whether to enable Comet's faster proprietary encoding for shuffle blocks " +
"rather than using Arrow IPC.")
.internal()
.booleanConf
.createWithDefault(true)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
Expand Down
1 change: 1 addition & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ fn benchmark(c: &mut Criterion) {
0,
None,
&CompressionCodec::Zstd(1),
true,
)
.unwrap();
});
Expand Down
128 changes: 65 additions & 63 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::Int32Builder;
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::shuffle::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
use comet::execution::shuffle::{CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::metrics::Time;
use datafusion::{
Expand All @@ -31,67 +31,54 @@ use std::sync::Arc;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_batch(8192, true);
let mut group = c.benchmark_group("shuffle_writer");
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (snappy)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Snappy, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (lz4)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Lz4Frame, &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)
});
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let ipc_time = Time::default();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)
});
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1));
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
criterion::black_box(rt.block_on(collect(stream)).unwrap());
});
});
for compression_codec in &[
CompressionCodec::None,
CompressionCodec::Lz4Frame,
CompressionCodec::Snappy,
CompressionCodec::Zstd(1),
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
CompressionCodec::Zstd(6),
] {
for enable_fast_encoding in [true, false] {
let name = format!("shuffle_writer: write encoded (enable_fast_encoding={enable_fast_encoding}, compression={compression_codec:?})");
group.bench_function(name, |b| {
let mut buffer = vec![];
let ipc_time = Time::default();
let w = ShuffleBlockWriter::try_new(
&batch.schema(),
enable_fast_encoding,
compression_codec.clone(),
)
.unwrap();
b.iter(|| {
buffer.clear();
let mut cursor = Cursor::new(&mut buffer);
w.write_batch(&batch, &mut cursor, &ipc_time).unwrap();
});
});
}
}

for compression_codec in [
CompressionCodec::None,
CompressionCodec::Lz4Frame,
CompressionCodec::Zstd(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/optional: snappy and zstd(6) here as well?

] {
group.bench_function(
format!("shuffle_writer: end to end (compression = {compression_codec:?}"),
|b| {
let ctx = SessionContext::new();
let exec = create_shuffle_writer_exec(compression_codec.clone());
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(collect(stream)).unwrap();
});
},
);
}
}

fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec {
Expand All @@ -104,6 +91,7 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
compression_codec,
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
true,
)
.unwrap()
}
Expand All @@ -121,11 +109,19 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Date32, true),
Field::new("c3", DataType::Decimal128(11, 2), true),
]));
let mut a = Int32Builder::new();
let mut b = StringBuilder::new();
let mut c = Date32Builder::new();
let mut d = Decimal128Builder::new()
.with_precision_and_scale(11, 2)
.unwrap();
for i in 0..num_rows {
a.append_value(i as i32);
c.append_value(i as i32);
d.append_value((i * 1000000) as i128);
if allow_nulls && i % 10 == 0 {
b.append_null();
} else {
Expand All @@ -134,7 +130,13 @@ fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
}
let a = a.finish();
let b = b.finish();
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap()
let c = c.finish();
let d = d.finish();
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)],
)
.unwrap()
}

fn config() -> Criterion {
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
current_checksum: jlong,
compression_codec: jstring,
compression_level: jint,
enable_fast_encoding: jboolean,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -686,6 +687,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_algo,
current_checksum,
&compression_codec,
enable_fast_encoding != JNI_FALSE,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,7 @@ impl PhysicalPlanner {
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
writer.enable_fast_encoding,
)?);

Ok((
Expand Down
Loading
Loading