From 6926390588ae40756a17480f432ee5e32e3377f3 Mon Sep 17 00:00:00 2001 From: Agarwal Date: Tue, 25 Apr 2017 16:24:39 -0700 Subject: [PATCH 1/6] LIVY-246 Support multiple Spark home in runtime --- conf/livy.conf.template | 6 ++++++ .../src/main/scala/com/cloudera/livy/LivyConf.scala | 12 +++++++++--- .../cloudera/livy/server/batch/BatchSession.scala | 7 ++++++- .../livy/server/batch/CreateBatchRequest.scala | 1 + .../cloudera/livy/utils/SparkProcessBuilder.scala | 4 ++-- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index dd2f054bd..ca982e86f 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -31,6 +31,12 @@ # If livy should impersonate the requesting users when creating a new session. # livy.impersonation.enabled = true +# Livy Spark Home +# livy.server.spark-home_1.5.2=/spark-1.5.2 +# livy.server.spark-home_1.6.3=/spark-1.6.3 +# livy.server.spark-home_2.0.1=/spark-2.0.1 +# livy.server.spark-home_2.1.0=/spark-2.1.0 + # Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation # directory every time a session is started. By caching these files in HDFS, for example, startup # time of sessions on YARN can be reduced. diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index 8fc4777aa..904a130c2 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -246,14 +246,20 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) /** Return the location of the spark home directory */ - def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME")) + def sparkHome(version: Option[String] = None): Option[String] = { + version.map {version => Option(get(s"livy.server.spark-home_$version")) + .orElse(throw new IllegalArgumentException( + s"Spark version: $version is not supported")) + }.getOrElse(Option(get(s"livy.server.spark-home")).orElse(sys.env.get("SPARK_HOME"))) + } /** Return the spark master Livy sessions should use. */ def sparkMaster(): String = get(LIVY_SPARK_MASTER) /** Return the path to the spark-submit executable. */ - def sparkSubmit(): String = { - sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get + def sparkSubmit(version: Option[String] = None): String = { + sparkHome(version).map { _ + File.separator + "bin" + File.separator + + "spark-submit" }.get } /** Return the list of superusers. */ diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 452a9d869..56e01d0dc 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -18,6 +18,7 @@ package com.cloudera.livy.server.batch +import java.io.File import java.lang.ProcessBuilder.Redirect import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} @@ -62,7 +63,11 @@ object BatchSession { request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) require(request.file != null, "File is required.") - val builder = new SparkProcessBuilder(livyConf) + val builder = new SparkProcessBuilder(livyConf, request.sparkVersion) + request.sparkVersion.map({ value => + builder.env("SPARK_CONF_DIR", livyConf.sparkHome(request.sparkVersion) + + File.separator + "conf") + }) builder.conf(conf) proxyUser.foreach(builder.proxyUser) diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala b/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala index 99459d57c..732cc536c 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala @@ -36,5 +36,6 @@ class CreateBatchRequest { var queue: Option[String] = None var name: Option[String] = None var conf: Map[String, String] = Map() + var sparkVersion: Option[String] = None } diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala index a38a76177..decbec105 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala @@ -25,9 +25,9 @@ import scala.collection.mutable.ArrayBuffer import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.util.LineBufferedProcess -class SparkProcessBuilder(livyConf: LivyConf) extends Logging { +class SparkProcessBuilder(livyConf: LivyConf, version: Option[String]) extends Logging { - private[this] var _executable: String = livyConf.sparkSubmit() + private[this] var _executable: String = livyConf.sparkSubmit(version) private[this] var _master: Option[String] = None private[this] var _deployMode: Option[String] = None private[this] var _className: Option[String] = None From 04e5241ddc026ad29c50ca22373b707f80277d3d Mon Sep 17 00:00:00 2001 From: Agarwal Date: Fri, 28 Apr 2017 14:12:22 -0700 Subject: [PATCH 2/6] Fixed LIVY-246 Support multiple Spark home in runtime --- conf/livy.conf.template | 8 ++++---- .../src/main/scala/com/cloudera/livy/LivyConf.scala | 12 ++++++++---- .../cloudera/livy/server/batch/BatchSession.scala | 4 ---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index ca982e86f..dbce01336 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -32,10 +32,10 @@ # livy.impersonation.enabled = true # Livy Spark Home -# livy.server.spark-home_1.5.2=/spark-1.5.2 -# livy.server.spark-home_1.6.3=/spark-1.6.3 -# livy.server.spark-home_2.0.1=/spark-2.0.1 -# livy.server.spark-home_2.1.0=/spark-2.1.0 +# livy.server.spark-home-1.5.2=/spark-1.5.2 +# livy.server.spark-home-1.6.3=/spark-1.6.3 +# livy.server.spark-home-2.0.1=/spark-2.0.1 +# livy.server.spark-home-2.1.0=/spark-2.1.0 # Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation # directory every time a session is started. By caching these files in HDFS, for example, startup diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index 904a130c2..fd9576a36 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -245,12 +245,16 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) - /** Return the location of the spark home directory */ - def sparkHome(version: Option[String] = None): Option[String] = { - version.map {version => Option(get(s"livy.server.spark-home_$version")) + /** Return the spark home version */ + def SPARK_HOME_VER(version: String): Option[String] = { + Option(get(s"livy.server.spark-home-$version")) .orElse(throw new IllegalArgumentException( s"Spark version: $version is not supported")) - }.getOrElse(Option(get(s"livy.server.spark-home")).orElse(sys.env.get("SPARK_HOME"))) + } + /** Return the location of the spark home directory */ + def sparkHome(version: Option[String] = None): Option[String] = { + version.map {version => SPARK_HOME_VER(version) + }.getOrElse(Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))) } /** Return the spark master Livy sessions should use. */ diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 56e01d0dc..100f981fa 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -64,10 +64,6 @@ object BatchSession { require(request.file != null, "File is required.") val builder = new SparkProcessBuilder(livyConf, request.sparkVersion) - request.sparkVersion.map({ value => - builder.env("SPARK_CONF_DIR", livyConf.sparkHome(request.sparkVersion) - + File.separator + "conf") - }) builder.conf(conf) proxyUser.foreach(builder.proxyUser) From 1c6592e06ce278b357dceb31f968b3fa6d499c68 Mon Sep 17 00:00:00 2001 From: Agarwal Date: Fri, 28 Apr 2017 17:49:12 -0700 Subject: [PATCH 3/6] Fxed LIVY-246 Support multiple spark versions in runtime --- .../main/scala/com/cloudera/livy/LivyConf.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index fd9576a36..f68a61fc6 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -246,15 +246,16 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) /** Return the spark home version */ - def SPARK_HOME_VER(version: String): Option[String] = { - Option(get(s"livy.server.spark-home-$version")) - .orElse(throw new IllegalArgumentException( - s"Spark version: $version is not supported")) - } + def SPARK_HOME_VER(version: String): Entry = Entry(s"livy.server.spark-home-$version", null) + /** Return the location of the spark home directory */ def sparkHome(version: Option[String] = None): Option[String] = { - version.map {version => SPARK_HOME_VER(version) - }.getOrElse(Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))) + version match { + case Some(value) => Option(get(SPARK_HOME_VER(value))) + .orElse(throw new IllegalArgumentException( + s"Spark version: $value is not supported")) + case None => Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME")) + } } /** Return the spark master Livy sessions should use. */ From cc736d40a20e75949e3477d96bcf7fa30d9fa8f5 Mon Sep 17 00:00:00 2001 From: Agarwal Date: Mon, 1 May 2017 16:19:21 -0700 Subject: [PATCH 4/6] LIVY-246 Fixed Livy multiple version support --- conf/livy-env.sh.template | 2 +- server/src/main/scala/com/cloudera/livy/LivyConf.scala | 6 +++--- .../scala/com/cloudera/livy/server/batch/BatchSession.scala | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/conf/livy-env.sh.template b/conf/livy-env.sh.template index b34893567..e1d4f3d51 100644 --- a/conf/livy-env.sh.template +++ b/conf/livy-env.sh.template @@ -1,4 +1,4 @@ -#!/usr/bin/env bash + #!/usr/bin/env bash # # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index f68a61fc6..2fe038e0d 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -158,6 +158,9 @@ object LivyConf { */ val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null) + /** Return the spark home version */ + def SPARK_HOME_VER(version: String): Entry = Entry(s"livy.server.spark-home-$version", null) + private val HARDCODED_SPARK_FILE_LISTS = Seq( SPARK_JARS, SPARK_FILES, @@ -245,9 +248,6 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) - /** Return the spark home version */ - def SPARK_HOME_VER(version: String): Entry = Entry(s"livy.server.spark-home-$version", null) - /** Return the location of the spark home directory */ def sparkHome(version: Option[String] = None): Option[String] = { version match { diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 100f981fa..6c9f105aa 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -64,6 +64,10 @@ object BatchSession { require(request.file != null, "File is required.") val builder = new SparkProcessBuilder(livyConf, request.sparkVersion) + request.sparkVersion.map({ value => + val sparkConf = livyConf.sparkHome(request.sparkVersion).map(_ + File.separator + "conf") + sparkConf.map(sc => builder.env("SPARK_CONF_DIR", sc)) + }) builder.conf(conf) proxyUser.foreach(builder.proxyUser) From 598d13edc5540d92da3e01f9beb696a057ddedac Mon Sep 17 00:00:00 2001 From: Agarwal Date: Mon, 1 May 2017 20:21:52 -0700 Subject: [PATCH 5/6] LIVY-246 Fixed Livy multiple version support --- conf/livy-env.sh.template | 2 +- .../scala/com/cloudera/livy/server/batch/BatchSession.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/conf/livy-env.sh.template b/conf/livy-env.sh.template index e1d4f3d51..b34893567 100644 --- a/conf/livy-env.sh.template +++ b/conf/livy-env.sh.template @@ -1,4 +1,4 @@ - #!/usr/bin/env bash +#!/usr/bin/env bash # # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 6c9f105aa..d214e4fed 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -64,10 +64,8 @@ object BatchSession { require(request.file != null, "File is required.") val builder = new SparkProcessBuilder(livyConf, request.sparkVersion) - request.sparkVersion.map({ value => - val sparkConf = livyConf.sparkHome(request.sparkVersion).map(_ + File.separator + "conf") - sparkConf.map(sc => builder.env("SPARK_CONF_DIR", sc)) - }) + val sparkConf = livyConf.sparkHome(request.sparkVersion).map(_ + File.separator + "conf") + sparkConf.map(sc => builder.env("SPARK_CONF_DIR", sc)) builder.conf(conf) proxyUser.foreach(builder.proxyUser) From da6edffd89609c0d13569f76b24d63026b1edb55 Mon Sep 17 00:00:00 2001 From: Agarwal Date: Thu, 11 May 2017 14:32:23 -0700 Subject: [PATCH 6/6] Fixed LIVY-246 Support multiple Spark home in runtime --- .../scala/com/cloudera/livy/server/batch/BatchSession.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index d214e4fed..a46818692 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -65,9 +65,10 @@ object BatchSession { val builder = new SparkProcessBuilder(livyConf, request.sparkVersion) val sparkConf = livyConf.sparkHome(request.sparkVersion).map(_ + File.separator + "conf") + val sparkHome = livyConf.sparkHome(request.sparkVersion) sparkConf.map(sc => builder.env("SPARK_CONF_DIR", sc)) + sparkHome.map(sh => builder.env("SPARK_HOME", sh)) builder.conf(conf) - proxyUser.foreach(builder.proxyUser) request.className.foreach(builder.className) request.driverMemory.foreach(builder.driverMemory)