Skip to content

Commit

Permalink
First part of Comet fixes (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwiewior authored Jul 26, 2024
1 parent bb9750e commit 4e51ad5
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 13 deletions.
33 changes: 33 additions & 0 deletions src/main/scala/org/biodatageeks/sequila/apps/ITApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.biodatageeks.sequila.apps

import org.apache.spark.sql.types.StructType

object ITApp extends App with SequilaApp {
override def main(args: Array[String]): Unit = {

val ss = createSequilaSession()
ss.sqlContext.setConf("spark.biodatageeks.rangejoin.useJoinOrder", "true")

val chainRn4_chr1 = ss
.read
.option("delimiter", "\t")
.option("header", true)
.schema(StructType.fromDDL("column0 string, column1 int, column2 int"))
.csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainRn4_chr1.csv")

chainRn4_chr1.createOrReplaceTempView("chainRn4_chr1")
val chainVicPac2 = ss
.read
.option("delimiter", "\t")
.option("header", true)
.schema(StructType.fromDDL("column0 string, column1 int, column2 int"))
.csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainVicPac2_chr1.csv")
chainVicPac2.createOrReplaceTempView("chainVicPac2_chr1")
ss.time{
ss
.sql("select count(*) from chainRn4_chr1 a, chainVicPac2_chr1 b where (a.column0=b.column0 and a.column2>=b.column1 and a.column1<=b.column2);")
.show()
}
ss.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import org.apache.spark.sql.catalyst.plans.Inner
*/

object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper {

//joinType, rangeJoinKeys, left, right, condition
//(JoinType, Seq[Expression], LogicalPlan, LogicalPlan)
type ReturnType =
(JoinType, Seq[Expression], LogicalPlan, LogicalPlan)
(JoinType, Seq[Expression], LogicalPlan, LogicalPlan, Option[Expression])
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case join @ Join(left, right, joinType, condition, hint) =>
logDebug(s"Considering join on: $condition")
Expand All @@ -24,21 +27,20 @@ object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper {
if (condition.size!=0 && joinType == Inner) {
condition.head match {
case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), LessThanOrEqual(l2, g2)) =>
Some((joinType,
getKeys(l1, l2, g1, g2, l3, r3, left, right),
left, right))
val rangeJoinKeys = getKeys(l1, l2, g1, g2, l3, r3, left, right)
Some((joinType, rangeJoinKeys, left, right, condition))
case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), LessThanOrEqual(l2, g2)) =>
Some((joinType,
getKeys(l1, l2, g1, g2, l3, r3, left, right),
left, right))
left, right,condition))
case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), GreaterThanOrEqual(g2, l2)) =>
Some((joinType,
getKeys(l1, l2, g1, g2, l3, r3, left, right),
left, right))
left, right, condition))
case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), GreaterThanOrEqual(g2, l2)) =>
Some((joinType,
getKeys(l1, l2, g1, g2, l3, r3, left, right),
left, right))
left, right, condition))
case _ => None
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, _}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -36,7 +37,9 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan,
context: SparkSession,
minOverlap: Int, maxGap: Int,
useJoinOrder: Boolean,
intervalHolderClassName: String
intervalHolderClassName: String,
conditionExact: Option[Expression],
joinType: JoinType
) extends BinaryExecNode with Serializable {
@transient lazy val output = left.output ++ right.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S
plan match {
case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) =>
IntervalTreeJoinOptim(planLater(left), planLater(right), rangeJoinKeys, spark,left,right,intervalHolderClassName) :: Nil
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) => {
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right, condition) => {
val minOverlap = spark.sqlContext.getConf(InternalParams.minOverlap,"1")
val maxGap = spark.sqlContext.getConf(InternalParams.maxGap,"0")
val useJoinOrder = spark.sqlContext.getConf(InternalParams.useJoinOrder,"false")
Expand All @@ -40,7 +40,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S
minOverlap.toInt,
maxGap.toInt,
useJoinOrder.toBoolean,
intervalHolderClassName) :: Nil
intervalHolderClassName, condition, joinType) :: Nil
}
case _ =>
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ case class
IntervalTreeJoinChromosome(left: SparkPlan,
right: SparkPlan,
condition: Seq[Expression],
context: SparkSession) extends BinaryExecNode {
context: SparkSession, conditionExact: Option[Expression]) extends BinaryExecNode {
def output = left.output ++ right.output

lazy val (buildPlan, streamedPlan) = (left, right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class IntervalTreeJoinStrategy(spark: SparkSession) extends Strategy with Serial
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) =>
IntervalTreeJoin(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) =>
IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right,condition) =>
IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark, condition) :: Nil
case _ =>
Nil
}
Expand Down

0 comments on commit 4e51ad5

Please sign in to comment.