Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into array_except
Browse files Browse the repository at this point in the history
# Conflicts:
#	spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
#	spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
  • Loading branch information
Kazantsev Maksim committed Jan 30, 2025
2 parents 61f70db + 443b5db commit b661960
Show file tree
Hide file tree
Showing 9 changed files with 469 additions and 303 deletions.
20 changes: 15 additions & 5 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf
*/
object CometConf extends ShimCometConf {

val COMPAT_GUIDE: String = "For more information, refer to the Comet Compatibility " +
"Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)"

private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " +
"Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)"

Expand Down Expand Up @@ -605,20 +608,27 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.expression.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all expressions. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all cast operations. " +
"Set this config to true to allow them anyway. See compatibility guide " +
"for more information.")
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
"Set this config to true to allow them anyway using Rust's regular expression engine. " +
"See compatibility guide for more information.")
.doc(
"Comet is not currently fully compatible with Spark for all regular expressions. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(false)

Expand Down
5 changes: 3 additions & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Comet provides the following configuration settings.
|--------|-------------|---------------|
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
Expand Down Expand Up @@ -64,6 +64,7 @@ Comet provides the following configuration settings.
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand All @@ -73,7 +74,7 @@ Comet provides the following configuration settings.
| spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 |
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
Expand Down
29 changes: 20 additions & 9 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,26 @@ The following Spark expressions are currently available. Any known compatibility
| VariancePop | |
| VarianceSamp | |

## Complex Types

| Expression | Notes |
| ----------------- | ----------- |
| CreateNamedStruct | |
| ElementAt | Arrays only |
| GetArrayItem | |
| GetStructField | |
| StructsToJson | |
## Arrays

| Expression | Notes |
|-------------------|--------------|
| ArrayAppend | Experimental |
| ArrayContains | Experimental |
| ArrayIntersect | Experimental |
| ArrayJoin | Experimental |
| ArrayRemove | Experimental |
| ArraysOverlap | Experimental |
| ElementAt | Arrays only |
| GetArrayItem | |

## Structs

| Expression | Notes |
|-------------------|--------------|
| CreateNamedStruct | |
| GetStructField | |
| StructsToJson | |

## Other

Expand Down
22 changes: 16 additions & 6 deletions docs/templates/compatibility-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Floating number comparison

Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark.
Expand All @@ -46,6 +40,22 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`).
functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)).
So Comet will add additional normalization expression of NaN and zero for comparison.

## Incompatible Expressions

Some Comet native expressions are not 100% compatible with Spark and are disabled by default. These expressions
will fall back to Spark but can be enabled by setting `spark.comet.expression.allowIncompatible=true`.

## Array Expressions

Comet has experimental support for a number of array expressions. These are experimental and currently marked
as incompatible and can be enabled by setting `spark.comet.expression.allowIncompatible=true`.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
130 changes: 28 additions & 102 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
binding: Boolean): Option[Expr] = {
SQLConf.get

def convert(handler: CometExpressionSerde): Option[Expr] = {
handler match {
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
expr,
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.")
None
case _ =>
handler.convert(expr, inputs, binding)
}
}

expr match {
case a @ Alias(_, _) =>
val r = exprToProtoInternal(a.child, inputs, binding)
Expand Down Expand Up @@ -2176,35 +2189,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
None
}

case Murmur3Hash(children, seed) =>
val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType))
if (firstUnSupportedInput.isDefined) {
withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}")
return None
}
val exprs = children.map(exprToProtoInternal(_, inputs, binding))
val seedBuilder = ExprOuterClass.Literal
.newBuilder()
.setDatatype(serializeDataType(IntegerType).get)
.setIntVal(seed)
val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build())
// the seed is put at the end of the arguments
scalarExprToProtoWithReturnType("murmur3_hash", IntegerType, exprs :+ seedExpr: _*)

case XxHash64(children, seed) =>
val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType))
if (firstUnSupportedInput.isDefined) {
withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}")
return None
}
val exprs = children.map(exprToProtoInternal(_, inputs, binding))
val seedBuilder = ExprOuterClass.Literal
.newBuilder()
.setDatatype(serializeDataType(LongType).get)
.setLongVal(seed)
val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build())
// the seed is put at the end of the arguments
scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*)
case _: Murmur3Hash => CometMurmur3Hash.convert(expr, inputs, binding)

case _: XxHash64 => CometXxHash64.convert(expr, inputs, binding)

case Sha2(left, numBits) =>
if (!numBits.foldable) {
Expand Down Expand Up @@ -2371,79 +2358,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
None
}
case expr: ArrayRemove => CometArrayRemove.convert(expr, inputs, binding)
case expr if expr.prettyName == "array_contains" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
case _ if expr.prettyName == "array_append" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
case _ if expr.prettyName == "array_intersect" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr))
case ArrayJoin(arrayExpr, delimiterExpr, nullReplacementExpr) =>
val arrayExprProto = exprToProto(arrayExpr, inputs, binding)
val delimiterExprProto = exprToProto(delimiterExpr, inputs, binding)

if (arrayExprProto.isDefined && delimiterExprProto.isDefined) {
val arrayJoinBuilder = nullReplacementExpr match {
case Some(nrExpr) =>
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding)
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
.setNullReplacementExpr(nullReplacementExprProto.get)
case None =>
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
}
Some(
ExprOuterClass.Expr
.newBuilder()
.setArrayJoin(arrayJoinBuilder)
.build())
} else {
val exprs: List[Expression] = nullReplacementExpr match {
case Some(nrExpr) => List(arrayExpr, delimiterExpr, nrExpr)
case None => List(arrayExpr, delimiterExpr)
}
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
None
}
case ArraysOverlap(leftArrayExpr, rightArrayExpr) =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
createBinaryExpr(
expr,
leftArrayExpr,
rightArrayExpr,
inputs,
binding,
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
} else {
withInfo(
expr,
s"$expr is not supported yet. To enable all incompatible casts, set " +
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
None
}
case _: ArrayRemove => convert(CometArrayRemove)
case _: ArrayContains => convert(CometArrayContains)
// Function introduced in 3.4.0. Refer by name to provide compatibility
// with older Spark builds
case _ if expr.prettyName == "array_append" => convert(CometArrayAppend)
case _: ArrayIntersect => convert(CometArrayIntersect)
case _: ArrayJoin => convert(CometArrayJoin)
case _: ArraysOverlap => convert(CometArraysOverlap)
case _ if expr.prettyName == "array_except" =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
createBinaryExpr(
Expand All @@ -2465,6 +2387,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
None
}

}

/**
Expand Down Expand Up @@ -3507,3 +3430,6 @@ trait CometExpressionSerde {
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr]
}

/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
trait IncompatExpr {}
Loading

0 comments on commit b661960

Please sign in to comment.