Skip to content

Commit

Permalink
feat: spark 3.4 support (#174)
Browse files Browse the repository at this point in the history
* Bumping to Spark-3.4.3

* Fixes after upgrade

* Apple Silicon (no-GKL) support

---------

Co-authored-by: Marek Wiewiórka <[email protected]>
  • Loading branch information
mwiewior and Marek Wiewiórka authored Jul 21, 2024
1 parent a49c32b commit 2c006d7
Show file tree
Hide file tree
Showing 30 changed files with 152 additions and 153 deletions.
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import sbtassembly.AssemblyPlugin.autoImport.ShadeRule
import scala.util.Properties

name := """sequila"""
val DEFAULT_SPARK_3_VERSION = "3.2.2"
val DEFAULT_SPARK_3_VERSION = "3.4.3"
lazy val sparkVersion = Properties.envOrElse("SPARK_VERSION", DEFAULT_SPARK_3_VERSION)

version := s"${sys.env.getOrElse("VERSION", "0.1.0")}"
Expand All @@ -14,7 +14,7 @@ scalaVersion := "2.12.13"
val isSnapshotVersion = settingKey[Boolean]("Is snapshot")
isSnapshotVersion := version.value.toLowerCase.contains("snapshot")

val DEFAULT_HADOOP_VERSION = "3.1.2"
val DEFAULT_HADOOP_VERSION = "3.3.6"

lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)

Expand All @@ -29,8 +29,9 @@ dependencyOverrides += "io.netty" % "netty-transport" % nettyVersion
dependencyOverrides += "io.netty" % "netty-transport-native-epoll" % nettyVersion
dependencyOverrides += "io.netty" % "netty-transport-native-unix-common" % nettyVersion
dependencyOverrides += "com.google.guava" % "guava" % "15.0"
dependencyOverrides += "org.apache.orc" % "orc-core" % "1.6.9"
dependencyOverrides += "org.apache.logging.log4j" % "log4j-core" % "2.3"
//dependencyOverrides += "org.apache.orc" % "orc-core" % "1.7.5"
//dependencyOverrides += "org.apache.logging.log4j" % "log4j-core" % "2.20.0"
//dependencyOverrides += "org.scalatest" %% "scalatest" % "3.0.3" % "test"


//removing hadoop-bam to used a patched one with support for htsjdk 2.22
Expand All @@ -40,10 +41,10 @@ libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "com.github.mrpowers" %% "spark-fast-tests" % "0.21.3"
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "0.38.2"
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % "3.2.0_1.2.0" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark3" % "0.36.0" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark3" % "0.36.0" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark3" % "0.36.0" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % "3.4.1_1.4.4" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark3" % "1.0.1" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark3" % "1.0.1" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark3" % "1.0.1" excludeAll (ExclusionRule("org.seqdoop"))
libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value
libraryDependencies += "org.rogach" %% "scallop" % "3.1.2"
libraryDependencies += "com.github.samtools" % "htsjdk" % "2.24.1"
Expand All @@ -54,9 +55,9 @@ libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.7"
libraryDependencies += "org.eclipse.jetty" % "jetty-servlet" % "9.3.24.v20180605"
libraryDependencies += "org.apache.derby" % "derbyclient" % "10.14.2.0"
//libraryDependencies += "org.disq-bio" % "disq" % "0.3.8" <-disabled since we use patched version of HtsjdkReadsTraversalParameters
libraryDependencies += "io.projectglow" %% "glow-spark3" % "1.0.1" excludeAll (ExclusionRule("com.github.samtools")) excludeAll (ExclusionRule("org.seqdoop")) //FIXME:: remove togehter with disq
libraryDependencies += "io.projectglow" %% "glow-spark3" % "2.0.0" excludeAll (ExclusionRule("com.github.samtools")) excludeAll (ExclusionRule("org.seqdoop")) //FIXME:: remove togehter with disq
libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.8"
libraryDependencies += "org.openjdk.jol" % "jol-core" % "0.16" % "provided"
libraryDependencies += "org.openjdk.jol" % "jol-core" % "0.17" % "provided"
libraryDependencies += "com.github.jsr203hadoop" % "jsr203hadoop" % "1.0.3"


Expand Down
Empty file added sbt-cache/ivy/.sbt.ivy.lock
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy.selectFilte
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -102,7 +103,7 @@ case class SequilaDataSourceStrategy(spark: SparkSession) extends Strategy
l.output.toStructType,
Set.empty,
Set.empty,
None,
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
toCatalystRDD(l, baseRelation.buildScan()),
baseRelation,
None) :: Nil
Expand Down Expand Up @@ -216,7 +217,7 @@ case class SequilaDataSourceStrategy(spark: SparkSession) extends Strategy
projects.map(_.toAttribute).toStructType,
Set.empty,
Set.empty,
None,
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
Expand All @@ -231,7 +232,7 @@ case class SequilaDataSourceStrategy(spark: SparkSession) extends Strategy
requestedColumns.toStructType,
Set.empty,
Set.empty,
None,
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.sql


import java.util.Locale

import org.apache.spark.sql.ResolveTableValuedFunctionsSeq.tvf
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, TypeCoercion, UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -35,7 +34,7 @@ import org.biodatageeks.sequila.utils.Columns
/**
* Rule that resolves table-valued function references.
*/
object ResolveTableValuedFunctionsSeq extends Rule[LogicalPlan] {
case class ResolveTableValuedFunctionsSeq(catalog: SessionCatalog) extends Rule[LogicalPlan] {
/**
* List of argument names and their types, used to declare a function.
*/
Expand Down Expand Up @@ -130,7 +129,7 @@ object ResolveTableValuedFunctionsSeq extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
val resolvedFunc = builtinFunctions.get(u.name.funcName.toLowerCase(Locale.ROOT)) match {
val resolvedFunc = builtinFunctions.get(u.name.head.toLowerCase(Locale.ROOT)) match {
case Some(tvf) =>
val resolved = tvf.flatMap { case (argList, resolver) =>
argList.implicitCast(u.functionArgs) match {
Expand All @@ -143,12 +142,12 @@ object ResolveTableValuedFunctionsSeq extends Rule[LogicalPlan] {
resolved.headOption.getOrElse {
val argTypes = u.functionArgs.map(_.dataType.typeName).mkString(", ")
u.failAnalysis(
s"""error: table-valued function ${u.name.funcName} with alternatives:
s"""error: table-valued function ${u.name.head} with alternatives:
|${tvf.keys.map(_.toString).toSeq.sorted.map(x => s" ($x)").mkString("\n")}
|cannot be applied to: (${argTypes})""".stripMargin)
|cannot be applied to: (${argTypes})""".stripMargin, Map.empty)
}
case _ =>
u.failAnalysis(s"could not resolve `${u.name.funcName}` to a table-valued function")
u.failAnalysis(s"could not resolve `${u.name.head}` to a table-valued function", Map.empty)
}

// If alias names assigned, add `Project` with the aliases
Expand All @@ -157,8 +156,8 @@ object ResolveTableValuedFunctionsSeq extends Rule[LogicalPlan] {
// Checks if the number of the aliases is equal to expected one
if (u.output.size != outputAttrs.size) {
u.failAnalysis(s"Number of given aliases does not match number of output columns. " +
s"Function name: ${u.name.funcName}; number of aliases: " +
s"${u.output.size}; number of output columns: ${outputAttrs.size}.")
s"Function name: ${u.name.head}; number of aliases: " +
s"${u.output.size}; number of output columns: ${outputAttrs.size}.", Map.empty)
}
val aliases = outputAttrs.zip(u.output).map {
case (attr, name) => Alias(attr, name.toString())()
Expand Down
60 changes: 30 additions & 30 deletions src/main/scala/org/biodatageeks/sequila/utvf/SeQuiLaAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,35 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.TypeCoercion.typeCoercionRules
import org.apache.spark.sql.{ResolveTableValuedFunctionsSeq, SparkSession}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
import org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.aggregate.ResolveEncodersInScalaAgg
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.command.CommandCheck
import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, TableCapabilityCheck}
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FallBackFileSourceV2, FindDataSourceTable, HiveOnlyCheck, PreReadCheck, PreWriteCheck, PreprocessTableCreation, PreprocessTableInsertion, ResolveSQLOnFile}



class SeQuiLaAnalyzer(session: SparkSession) extends
Analyzer( session.sessionState.analyzer.catalogManager){
Analyzer( session.sessionState.analyzer.catalogManager) {

override val conf = session.sessionState.conf
val catalog = session.sessionState.analyzer.catalogManager.v1SessionCatalog
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
new ResolveSessionCatalog(
catalogManager) +:
ResolveEncodersInScalaAgg+: session.extensions.buildResolutionRules(session)
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
//FIXME: After upgrade to Spark - 3.4.0, this line is commented out
// new ResolveSessionCatalog(
// catalogManager) +:
ResolveEncodersInScalaAgg +: session.extensions.buildResolutionRules(session)


override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion +:
DataSourceAnalysis +: session.extensions.buildPostHocResolutionRules(session)

override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
Expand All @@ -42,9 +41,6 @@ class SeQuiLaAnalyzer(session: SparkSession) extends
CommandCheck +: session.extensions.buildCheckRules(session)


private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog


override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
Expand All @@ -53,6 +49,7 @@ class SeQuiLaAnalyzer(session: SparkSession) extends
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
BindParameters,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
Expand All @@ -63,28 +60,28 @@ class SeQuiLaAnalyzer(session: SparkSession) extends
ResolveHints.ResolveCoalesceHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Keep Legacy Outputs", Once,
KeepLegacyOutputs),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctionsSeq ::
ResolveNamespace(catalogManager) ::
ResolveTableValuedFunctionsSeq(catalog) ::
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
ResolveAlterTableCommands ::
ResolveFieldNameAndPosition ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
ResolveLateralColumnAliasReference ::
ResolveExpressionsWithNamePlaceholders ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveUnpivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
Expand All @@ -100,19 +97,19 @@ class SeQuiLaAnalyzer(session: SparkSession) extends
ResolveAggregateFunctions ::
TimeWindowing ::
SessionWindowing ::
ResolveWindowTime ::
ResolveDefaultColumns(ResolveRelations.resolveRelationOrTempView) ::
ResolveInlineTables ::
ResolveHigherOrderFunctions(catalogManager) ::
ResolveLambdaVariables ::
ResolveTimeZone ::
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
ResolveUnion ::
RewriteDeleteFromTable ::
typeCoercionRules ++
Seq(ResolveWithCTE) ++
extendedResolutionRules : _*),
extendedResolutionRules: _*),
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
Batch("Apply Char Padding", Once,
ApplyCharTypePadding),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Expand All @@ -129,7 +126,10 @@ class SeQuiLaAnalyzer(session: SparkSession) extends
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases),
Batch("HandleAnalysisOnlyCommand", Once,
HandleAnalysisOnlyCommand)
Batch("HandleSpecialCommand", Once,
HandleSpecialCommand),
Batch("Remove watermark for batch query", Once,
EliminateEventTimeWatermark)
)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ case class SequilaSessionState(sparkSession: SparkSession, customAnalyzer: Analy
sparkSession.sessionState.executePlan,
(sparkSession:SparkSession,sessionState: SessionState) => sessionState.clone(sparkSession),
sparkSession.sessionState.columnarRules,
sparkSession.sessionState.queryStagePrepRules
sparkSession.sessionState.adaptiveRulesHolder,
sparkSession.sessionState.planNormalizationRules
){
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package org.biodatageeks.sequila.tests.base

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.apache.spark.sql.SequilaSession
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

class BAMBaseTestSuite
extends FunSuite
extends AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext with BeforeAndAfter{

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.biodatageeks.sequila.tests.base

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter}
import org.scalatest.funsuite.AnyFunSuite

class BEDBaseTestSuite
extends
FunSuite
AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext with BeforeAndAfter{

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.biodatageeks.sequila.tests.base

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter}
import org.scalatest.funsuite.AnyFunSuite

class FASTQBaseTestSuite
extends
FunSuite
AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext with BeforeAndAfter{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.biodatageeks.sequila.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{BeforeAndAfter}
import org.scalatest.funsuite.AnyFunSuite

class IntervalJoinBaseTestSuite extends FunSuite
class IntervalJoinBaseTestSuite extends AnyFunSuite
with DataFrameSuiteBase
with SharedSparkContext
with BeforeAndAfter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.biodatageeks.sequila.tests.dataquality

import org.biodatageeks.sequila.utils.DataQualityFuncs
import org.scalatest.FunSuite
import org.scalatest.funsuite.AnyFunSuite

class ContigNormalizationTest extends FunSuite{
class ContigNormalizationTest extends AnyFunSuite{

test("Test contig") {
val chrInTest1 = "chr1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.biodatageeks.sequila.tests.datasources

import java.io.{OutputStreamWriter, PrintWriter}

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.biodatageeks.sequila.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
import org.biodatageeks.sequila.rangejoins.genApp.IntervalTreeJoinStrategy
import org.biodatageeks.sequila.utils.{Columns, InternalParams}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.BeforeAndAfter
import org.scalatest.funsuite.AnyFunSuite

class ADAMBenchmarkTestSuite
extends FunSuite
extends AnyFunSuite
with DataFrameSuiteBase
with BeforeAndAfter
with SharedSparkContext {
Expand Down
Loading

0 comments on commit 2c006d7

Please sign in to comment.