Skip to content

Commit

Permalink
chore: Follow-on PR to fully enable onheap memory usage (#1210)
Browse files Browse the repository at this point in the history
* Make datafusion's native memory pool configurable

* save

* fix

* Update memory calculation and add draft documentation

* ready for review

* ready for review

* address feedback

* Update docs/source/user-guide/tuning.md

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* Update docs/source/user-guide/tuning.md

Co-authored-by: Kristin Cowalcijk <[email protected]>

* Update docs/source/user-guide/tuning.md

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* Update docs/source/user-guide/tuning.md

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* remove unused config

---------

Co-authored-by: Kristin Cowalcijk <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2025
1 parent 5c389d1 commit e72beb1
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 46 deletions.
9 changes: 0 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,6 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(8192)

val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction")
.doc(
"The fraction of memory from Comet memory overhead that the native memory " +
"manager can use for execution. The purpose of this config is to set aside memory for " +
"untracked data structures, as well as imprecise size estimation during memory " +
"acquisition.")
.doubleConf
.createWithDefault(0.7)

val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet.")
Expand Down
1 change: 0 additions & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ Comet provides the following configuration settings.
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
Expand Down
57 changes: 51 additions & 6 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,52 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.
### Unified Memory Management with Off-Heap Memory

The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows
Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html.

### Dedicated Comet Memory Pools

Spark uses on-heap memory mode by default, i.e., the `spark.memory.offHeap.enabled` setting is not enabled. If Spark is under on-heap memory mode, Comet will use its own dedicated memory pools that
are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of
memory pool to use.

The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. If this setting is not specified then
the memory overhead will be calculated by multiplying the executor memory by `spark.comet.memory.overhead.factor`
(defaults to `0.2`).

The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.

The valid pool types are:

- `greedy`
- `greedy_global`
- `greedy_task_shared`
- `fair_spill`
- `fair_spill_global`
- `fair_spill_task_shared`

Pool types ending with `_global` use a single global memory pool between all tasks on same executor.

Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task.

Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores
and cores per task.

The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This
pool works well for queries that do not need to spill or have a single spillable operator.

The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more
than an even fraction of the available memory sans any unspillable reservations
(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This pool works best when you know beforehand
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
a first-come, first-serve fashion

[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html

Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

### Determining How Much Memory to Allocate

Expand Down Expand Up @@ -106,15 +147,19 @@ then any shuffle operations that cannot be supported in this mode will fall back
### Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.

## Explain Plan

### Extended Explain

With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
reasons why Comet may not have been enabled for specific operations.
To enable this, in the Spark configuration, set the following:

```shell
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
```
This will add a section to the detailed plan displayed in the Spark SQL UI page.

This will add a section to the detailed plan displayed in the Spark SQL UI page.
7 changes: 2 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_pool_type: jstring,
memory_limit: jlong,
memory_limit_per_task: jlong,
memory_fraction: jdouble,
task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
Expand Down Expand Up @@ -208,7 +207,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_pool_type,
memory_limit,
memory_limit_per_task,
memory_fraction,
)?;
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);
Expand Down Expand Up @@ -281,14 +279,13 @@ fn parse_memory_pool_config(
memory_pool_type: String,
memory_limit: i64,
memory_limit_per_task: i64,
memory_fraction: f64,
) -> CometResult<MemoryPoolConfig> {
let memory_pool_config = if use_unified_memory_manager {
MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
} else {
// Use the memory pool from DF
let pool_size = (memory_limit as f64 * memory_fraction) as usize;
let pool_size_per_task = (memory_limit_per_task as f64 * memory_fraction) as usize;
let pool_size = memory_limit as usize;
let pool_size_per_task = memory_limit_per_task as usize;
match memory_pool_type.as_str() {
"fair_spill_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static CometShuffleMemoryAllocatorTrait getInstance(
(boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();

if (isSparkTesting && !useUnifiedMemAllocator) {
if (!useUnifiedMemAllocator) {
synchronized (CometShuffleMemoryAllocator.class) {
if (INSTANCE == null) {
// CometTestShuffleMemoryAllocator handles pages by itself so it can be a singleton.
Expand Down
12 changes: 8 additions & 4 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.apache.comet

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -52,7 +53,8 @@ class CometExecIterator(
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIndex: Int)
extends Iterator[ColumnarBatch] {
extends Iterator[ColumnarBatch]
with Logging {

private val nativeLib = new Native()
private val nativeUtil = new NativeUtil()
Expand All @@ -75,7 +77,6 @@ class CometExecIterator(
memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(),
memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
memory_limit_per_task = getMemoryLimitPerTask(conf),
memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
task_attempt_id = TaskContext.get().taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
Expand All @@ -94,7 +95,10 @@ class CometExecIterator(
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
(maxMemory.toFloat * coresPerTask / numCores).toLong
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
logInfo(
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)")
limit
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}

import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
Expand Down Expand Up @@ -919,14 +919,6 @@ class CometSparkSessionExtensions
}

override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if (!isOffHeapEnabled(conf) && !isTesting) {
logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false")
withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false")
return plan
}

// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down Expand Up @@ -1187,21 +1179,12 @@ object CometSparkSessionExtensions extends Logging {
}
}

private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean

// Copied from org.apache.spark.util.Utils which is private to Spark.
private[comet] def isTesting: Boolean = {
System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null
}

// Check whether Comet shuffle is enabled:
// 1. `COMET_EXEC_SHUFFLE_ENABLED` is true
// 2. `spark.shuffle.manager` is set to `CometShuffleManager`
// 3. Off-heap memory is enabled || Spark/Comet unit testing
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
(isOffHeapEnabled(conf) || isTesting)
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Expand Down
1 change: 0 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class Native extends NativeBase {
memory_pool_type: String,
memory_limit: Long,
memory_limit_per_task: Long,
memory_fraction: Double,
task_attempt_id: Long,
debug: Boolean,
explain: Boolean,
Expand Down

0 comments on commit e72beb1

Please sign in to comment.