diff --git a/src/main/scala/org/biodatageeks/sequila/apps/ITApp.scala b/src/main/scala/org/biodatageeks/sequila/apps/ITApp.scala new file mode 100644 index 00000000..677c0762 --- /dev/null +++ b/src/main/scala/org/biodatageeks/sequila/apps/ITApp.scala @@ -0,0 +1,33 @@ +package org.biodatageeks.sequila.apps + +import org.apache.spark.sql.types.StructType + +object ITApp extends App with SequilaApp { + override def main(args: Array[String]): Unit = { + + val ss = createSequilaSession() + ss.sqlContext.setConf("spark.biodatageeks.rangejoin.useJoinOrder", "true") + + val chainRn4_chr1 = ss + .read + .option("delimiter", "\t") + .option("header", true) + .schema(StructType.fromDDL("column0 string, column1 int, column2 int")) + .csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainRn4_chr1.csv") + + chainRn4_chr1.createOrReplaceTempView("chainRn4_chr1") + val chainVicPac2 = ss + .read + .option("delimiter", "\t") + .option("header", true) + .schema(StructType.fromDDL("column0 string, column1 int, column2 int")) + .csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainVicPac2_chr1.csv") + chainVicPac2.createOrReplaceTempView("chainVicPac2_chr1") + ss.time{ + ss + .sql("select count(*) from chainRn4_chr1 a, chainVicPac2_chr1 b where (a.column0=b.column0 and a.column2>=b.column1 and a.column1<=b.column2);") + .show() + } + ss.stop() + } +} diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/common/ExtractRangeJoinKeysWithEquality.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/common/ExtractRangeJoinKeysWithEquality.scala index 25f97924..8a25b850 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/common/ExtractRangeJoinKeysWithEquality.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/common/ExtractRangeJoinKeysWithEquality.scala @@ -11,8 +11,11 @@ import org.apache.spark.sql.catalyst.plans.Inner */ object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper { + + //joinType, rangeJoinKeys, left, right, condition + //(JoinType, Seq[Expression], LogicalPlan, LogicalPlan) type ReturnType = - (JoinType, Seq[Expression], LogicalPlan, LogicalPlan) + (JoinType, Seq[Expression], LogicalPlan, LogicalPlan, Option[Expression]) def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { case join @ Join(left, right, joinType, condition, hint) => logDebug(s"Considering join on: $condition") @@ -24,21 +27,20 @@ object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper { if (condition.size!=0 && joinType == Inner) { condition.head match { case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), LessThanOrEqual(l2, g2)) => - Some((joinType, - getKeys(l1, l2, g1, g2, l3, r3, left, right), - left, right)) + val rangeJoinKeys = getKeys(l1, l2, g1, g2, l3, r3, left, right) + Some((joinType, rangeJoinKeys, left, right, condition)) case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), LessThanOrEqual(l2, g2)) => Some((joinType, getKeys(l1, l2, g1, g2, l3, r3, left, right), - left, right)) + left, right,condition)) case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), GreaterThanOrEqual(g2, l2)) => Some((joinType, getKeys(l1, l2, g1, g2, l3, r3, left, right), - left, right)) + left, right, condition)) case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), GreaterThanOrEqual(g2, l2)) => Some((joinType, getKeys(l1, l2, g1, g2, l3, r3, left, right), - left, right)) + left, right, condition)) case _ => None } } else { diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala index 66262078..bb6c4496 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner +import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, _} import org.apache.spark.sql.internal.SQLConf @@ -36,7 +37,9 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan, context: SparkSession, minOverlap: Int, maxGap: Int, useJoinOrder: Boolean, - intervalHolderClassName: String + intervalHolderClassName: String, + conditionExact: Option[Expression], + joinType: JoinType ) extends BinaryExecNode with Serializable { @transient lazy val output = left.output ++ right.output diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala index 4ee29e05..ee00be15 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala @@ -21,7 +21,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S plan match { case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) => IntervalTreeJoinOptim(planLater(left), planLater(right), rangeJoinKeys, spark,left,right,intervalHolderClassName) :: Nil - case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) => { + case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right, condition) => { val minOverlap = spark.sqlContext.getConf(InternalParams.minOverlap,"1") val maxGap = spark.sqlContext.getConf(InternalParams.maxGap,"0") val useJoinOrder = spark.sqlContext.getConf(InternalParams.useJoinOrder,"false") @@ -40,7 +40,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S minOverlap.toInt, maxGap.toInt, useJoinOrder.toBoolean, - intervalHolderClassName) :: Nil + intervalHolderClassName, condition, joinType) :: Nil } case _ => Nil diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinChromosome.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinChromosome.scala index cba79141..6a72dc51 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinChromosome.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinChromosome.scala @@ -14,7 +14,7 @@ case class IntervalTreeJoinChromosome(left: SparkPlan, right: SparkPlan, condition: Seq[Expression], - context: SparkSession) extends BinaryExecNode { + context: SparkSession, conditionExact: Option[Expression]) extends BinaryExecNode { def output = left.output ++ right.output lazy val (buildPlan, streamedPlan) = (left, right) diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinStrategy.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinStrategy.scala index 3ed93b36..3d4e1f3b 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinStrategy.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinStrategy.scala @@ -11,8 +11,8 @@ class IntervalTreeJoinStrategy(spark: SparkSession) extends Strategy with Serial def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) => IntervalTreeJoin(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil - case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) => - IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil + case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right,condition) => + IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark, condition) :: Nil case _ => Nil }