Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-246 Support multiple Spark home in runtime #318

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Livy Spark Home
# livy.server.spark-home-1.5.2=<path>/spark-1.5.2
# livy.server.spark-home-1.6.3=<path>/spark-1.6.3
# livy.server.spark-home-2.0.1=<path>/spark-2.0.1
# livy.server.spark-home-2.1.0=<path>/spark-2.1.0

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
# time of sessions on YARN can be reduced.
Expand Down
17 changes: 14 additions & 3 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ object LivyConf {
*/
val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null)

/** Return the spark home version */
def SPARK_HOME_VER(version: String): Entry = Entry(s"livy.server.spark-home-$version", null)

private val HARDCODED_SPARK_FILE_LISTS = Seq(
SPARK_JARS,
SPARK_FILES,
Expand Down Expand Up @@ -246,14 +249,22 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

/** Return the location of the spark home directory */
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
def sparkHome(version: Option[String] = None): Option[String] = {
version match {
case Some(value) => Option(get(SPARK_HOME_VER(value)))
.orElse(throw new IllegalArgumentException(
s"Spark version: $value is not supported"))
case None => Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
}
}

/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
def sparkSubmit(version: Option[String] = None): String = {
sparkHome(version).map { _ + File.separator + "bin" + File.separator +
"spark-submit" }.get
}

/** Return the list of superusers. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.cloudera.livy.server.batch

import java.io.File
import java.lang.ProcessBuilder.Redirect

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
Expand Down Expand Up @@ -62,7 +63,9 @@ object BatchSession {
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
require(request.file != null, "File is required.")

val builder = new SparkProcessBuilder(livyConf)
val builder = new SparkProcessBuilder(livyConf, request.sparkVersion)
val sparkConf = livyConf.sparkHome(request.sparkVersion).map(_ + File.separator + "conf")
sparkConf.map(sc => builder.env("SPARK_CONF_DIR", sc))
builder.conf(conf)

proxyUser.foreach(builder.proxyUser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ class CreateBatchRequest {
var queue: Option[String] = None
var name: Option[String] = None
var conf: Map[String, String] = Map()
var sparkVersion: Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import scala.collection.mutable.ArrayBuffer
import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.util.LineBufferedProcess

class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
class SparkProcessBuilder(livyConf: LivyConf, version: Option[String]) extends Logging {

private[this] var _executable: String = livyConf.sparkSubmit()
private[this] var _executable: String = livyConf.sparkSubmit(version)
private[this] var _master: Option[String] = None
private[this] var _deployMode: Option[String] = None
private[this] var _className: Option[String] = None
Expand Down