diff --git a/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala b/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala
index bbcbf39f..ccaf5c0c 100644
--- a/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala
+++ b/peel-core/src/main/scala/eu/stratosphere/peel/core/config/Model.scala
@@ -92,6 +92,10 @@ object Model {
val hosts = c.getStringList(key)
}
+ class HostsWithPort(val c: Config, val prefix: String) extends Model {
+ val hosts = (for (host <- c.getStringList(s"$prefix.slaves").asScala) yield s"$host:${c.getInt(s"$prefix.cli.dataport")}").asJava
+ }
+
def factory[T <: Model](config: Config, prefix: String)(implicit m: Manifest[T]) = {
val constructor = m.runtimeClass.getConstructor(classOf[Config], classOf[String])
constructor.newInstance(config, prefix)
diff --git a/peel-extensions/src/main/resources/peel-extensions.xml b/peel-extensions/src/main/resources/peel-extensions.xml
index bd1b09ef..c9b6f4eb 100644
--- a/peel-extensions/src/main/resources/peel-extensions.xml
+++ b/peel-extensions/src/main/resources/peel-extensions.xml
@@ -66,4 +66,10 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf b/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf
new file mode 100644
index 00000000..e073785f
--- /dev/null
+++ b/peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf
@@ -0,0 +1,16 @@
+# include common h2o configuration
+include "reference.h2o.conf"
+
+system {
+ h2o {
+ path {
+ #IMPORTANT: it has to be a pre-built version with HADOOP
+ #TODO: zip file is not supported
+ archive.url = "http://h2o-release.s3.amazonaws.com/h2o/rel-shannon/12/h2o-3.0.0.12-hdp2.2.zip"
+ archive.md5 = "297811A13E849D264F6223536D736B19"
+ archive.src = ${app.path.downloads}"/h2o-3.0.0.12-hdp2.2.zip"
+ home = ${system.h2o.path.archive.dst}"/h2o-3.0.0.12-hdp2.2"
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/peel-extensions/src/main/resources/reference.h2o.conf b/peel-extensions/src/main/resources/reference.h2o.conf
new file mode 100644
index 00000000..c4bdef36
--- /dev/null
+++ b/peel-extensions/src/main/resources/reference.h2o.conf
@@ -0,0 +1,26 @@
+system {
+ h2o {
+ user = ${system.default.user}
+ group = ${system.default.group}
+ path {
+ archive.dst = ${app.path.systems}
+ }
+ startup {
+ max.attempts = ${system.default.startup.max.attempts}
+ polling {
+ counter = ${system.default.startup.polling.counter}
+ interval = ${system.default.startup.polling.interval}
+ }
+ }
+ config {
+ # put list of slaves
+ slaves = ${system.default.config.slaves}
+ cli {
+ tmp.dir = "/tmp/h2o-"${system.default.user}
+ dataport = 55555
+ memory.per-node = 2g
+ parallelism.per-node = ${system.default.config.parallelism.per-node}
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache b/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache
new file mode 100644
index 00000000..0069487b
--- /dev/null
+++ b/peel-extensions/src/main/resources/templates/h2o/conf/h2o-conf.mustache
@@ -0,0 +1,5 @@
+{{#tmp.dir}}tmp.dir {{tmp.dir}}{{/tmp.dir}}
+{{#dataport}}dataport {{dataport}}{{/dataport}}
+{{#parallelism.per-node}}parallelism.per-node {{parallelism.per-node}}{{/parallelism.per-node}}
+{{#memory.per-node}}memory.per-node {{memory.per-node}}{{/memory.per-node}}
+{{#single-precision}}single-precision {{single-precision}}{{/single-precision}}
\ No newline at end of file
diff --git a/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache b/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache
new file mode 100644
index 00000000..f70d5453
--- /dev/null
+++ b/peel-extensions/src/main/resources/templates/h2o/conf/hosts.mustache
@@ -0,0 +1,3 @@
+{{#hosts}}
+{{{.}}}
+{{/hosts}}
diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala
new file mode 100644
index 00000000..1dbabf57
--- /dev/null
+++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/experiment/H2OExperiment.scala
@@ -0,0 +1,163 @@
+package eu.stratosphere.peel.extensions.h2o.beans.experiment
+
+import java.lang.{System => Sys}
+import java.io.FileWriter
+import java.nio.file.{Paths, Files}
+
+import com.typesafe.config.Config
+import eu.stratosphere.peel.core.beans.data.{ExperimentOutput, DataSet}
+import eu.stratosphere.peel.core.beans.experiment.Experiment
+import eu.stratosphere.peel.core.util.shell
+import eu.stratosphere.peel.extensions.h2o.beans.system.H2O
+import spray.json._
+
+/** An [[eu.stratosphere.peel.core.beans.experiment.Experiment Experiment]] implementation which handles the execution
+ * of a single H2O job.
+ *
+ * Note:
+ * H2O currently doesn't support submitting a java/scala job in jar file.
+ * There are 2 ways to submit a h2o job:
+ * 1. through interactive web interface, only the pre-implemented algorithms are supported.
+ * 2. execute a python/R script. precondition: install the h2o python/R package
+ *
+ * So if you don't want to install the h2o python/R package, you have to write a java program which simulate all
+ * the actions on the web interface.
+ */
+class H2OExperiment(
+ command: String,
+ runner : H2O,
+ runs : Int,
+ inputs : Set[DataSet],
+ outputs: Set[ExperimentOutput],
+ name : String,
+ config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) {
+
+ def this(
+ runs : Int,
+ runner : H2O,
+ input : DataSet,
+ output : ExperimentOutput,
+ command: String,
+ name : String,
+ config : Config) = this(command, runner, runs, Set(input), Set(output), name, config)
+
+ def this(
+ runs : Int,
+ runner : H2O,
+ inputs : Set[DataSet],
+ output : ExperimentOutput,
+ command: String,
+ name : String,
+ config : Config) = this(command, runner, runs, inputs, Set(output), name, config)
+
+ override def run(id: Int, force: Boolean): Experiment.Run[H2O] = new H2OExperiment.SingleJobRun(id, this, force)
+
+ def copy(name: String = name, config: Config = config) = new H2OExperiment(command, runner, runs, inputs, outputs, name, config)
+
+}
+
+object H2OExperiment {
+
+ case class State(
+ name: String,
+ suiteName: String,
+ command: String,
+ runnerID: String,
+ runnerName: String,
+ runnerVersion: String,
+ var runExitCode: Option[Int] = None,
+ var runTime: Long = 0) extends Experiment.RunState {}
+
+ object StateProtocol extends DefaultJsonProtocol with NullOptions {
+ implicit val stateFormat = jsonFormat8(State)
+ }
+
+ /** A private inner class encapsulating the logic of single run. */
+ class SingleJobRun(val id: Int, val exp: H2OExperiment, val force: Boolean) extends Experiment.SingleJobRun[H2O, State] {
+
+ import eu.stratosphere.peel.extensions.h2o.beans.experiment.H2OExperiment.StateProtocol._
+
+ val runnerLogPath = s"${exp.config.getString("system.h2o.config.cli.tmp.dir")}/h2ologs"
+
+ override def isSuccessful = state.runExitCode.getOrElse(-1) == 0
+
+ override protected def logFilePatterns = List(s"$runnerLogPath/h2o_*-3-info.log")
+
+ val pollingNode = exp.config.getStringList("system.h2o.config.slaves").get(0)
+ val user = exp.config.getString("system.h2o.user")
+ val dataPort = exp.config.getInt("system.h2o.config.cli.dataport")
+
+ override protected def loadState(): State = {
+ if (Files.isRegularFile(Paths.get(s"$home/state.json"))) {
+ try {
+ io.Source.fromFile(s"$home/state.json").mkString.parseJson.convertTo[State]
+ } catch {
+ case e: Throwable => State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version)
+ }
+ } else {
+ State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version)
+ }
+ }
+
+ override protected def writeState() = {
+ val fw = new FileWriter(s"$home/state.json")
+ fw.write(state.toJson.prettyPrint)
+ fw.close()
+ }
+
+ override protected def runJob() = {
+ // try to execute the experiment
+ val (runExit, t) = Experiment.time(this ! (command, s"$home/run.out", s"$home/run.err"))
+ state.runTime = t
+ state.runExitCode = Some(runExit)
+ }
+
+ /** Before the run, collect runner log files and their current line counts */
+ override protected def beforeRun() = {
+ val logFiles = for (pattern <- logFilePatterns; f <- (shell !! s""" ssh $user@$pollingNode "ls $pattern" """).split(Sys.lineSeparator).map(_.trim)) yield f
+ logFileCounts = Map((for (f <- logFiles) yield f -> (shell !! s""" ssh $user@$pollingNode "wc -l $f | cut -d' ' -f1" """).trim.toLong): _*)
+ }
+
+ /** After the run, copy logs */
+ override protected def afterRun(): Unit = {
+ shell ! s"rm -Rf $home/logs/*"
+ for ((file, count) <- logFileCounts) shell ! s""" ssh $user@$pollingNode "tail -n +${count + 1} $file" > $home/logs/${Paths.get(file).getFileName}"""
+ }
+
+ override def cancelJob() = {
+ //firstly, retrieve the jobid of running job
+ val s = shell !! s"wget -qO- $user@$pollingNode:$dataPort/3/Jobs"
+ var jobid = ""
+ for (job <- s.parseJson.asJsObject.fields.get("jobs").get.asInstanceOf[JsArray].elements) {
+ val fields = job.asJsObject.fields
+ if (fields.getOrElse("status", "").toString == "\"RUNNING\"" || fields.getOrElse("status", "").toString == "\"CREATED\"") {
+ val sid = fields.get("key").get.asInstanceOf[JsObject].fields.get("name").get.toString()
+ jobid = sid.substring(1, jobid.length - 1)
+ }
+ }
+
+ //send cancel request to REST API
+ shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")}/cancel --post-data="job_id=${jobid}" """
+
+ //check if the job has been successfully cancelled
+ var isCancelled = false
+ while (!isCancelled) {
+ Thread.sleep(exp.config.getInt("system.h2o.startup.polling.interval") * 2)
+ val status = this getStatus(jobid)
+ isCancelled = status == "CANCELLED"
+ }
+ state.runTime = exp.config.getLong("experiment.timeout") * 1000
+ state.runExitCode = Some(-1)
+ }
+
+ private def !(command: String, outFile: String, errFile: String) = {
+ shell ! s"$command > $outFile 2> $errFile"
+ }
+
+ private def getStatus(jobid: String): String = {
+ (shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"status":"[A-Z]+"' | grep -Eo [A-Z]+ """).stripLineEnd
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala
new file mode 100644
index 00000000..fad30159
--- /dev/null
+++ b/peel-extensions/src/main/scala/eu/stratosphere/peel/extensions/h2o/beans/system/H2O.scala
@@ -0,0 +1,109 @@
+package eu.stratosphere.peel.extensions.h2o.beans.system
+
+import com.samskivert.mustache.Mustache
+import com.typesafe.config.ConfigException
+import eu.stratosphere.peel.core.beans.system.Lifespan._
+import eu.stratosphere.peel.core.beans.system.{SetUpTimeoutException, System}
+import eu.stratosphere.peel.core.config.{Model, SystemConfig}
+import eu.stratosphere.peel.core.util.shell
+
+import scala.collection.JavaConverters._
+
+/** Wrapper for H2O
+ *
+ * Implements H2O as a [[eu.stratosphere.peel.core.beans.system.System System]] class and provides setup and teardown methods.
+ *
+ * @param version Version of the system (e.g. "2.8.6")
+ * @param lifespan [[eu.stratosphere.peel.core.beans.system.Lifespan Lifespan]] of the system
+ * @param dependencies Set of dependencies that this system needs
+ * @param mc The moustache compiler to compile the templates that are used to generate property files for the system
+ *
+ *
+ */
+class H2O(version: String, lifespan: Lifespan, dependencies: Set[System] = Set(), mc: Mustache.Compiler) extends System("h2o", version, lifespan, dependencies, mc) {
+
+ override def configuration() = SystemConfig(config, {
+ val home = config.getString("system.h2o.path.home")
+ List(
+ SystemConfig.Entry[Model.HostsWithPort]("system.h2o.config", s"$home/flatfile", templatePath("conf/hosts"), mc),
+ SystemConfig.Entry[Model.Yaml]("system.h2o.config.cli", s"$home/conf", templatePath("conf/h2o-conf"), mc)
+ )
+ })
+
+ override protected def start(): Unit = {
+ val user = config.getString("system.h2o.user")
+ val tmpDir =config.getString("system.h2o.config.cli.tmp.dir")
+ val dataPort = config.getInt("system.h2o.config.cli.dataport")
+ val memory = config.getString("system.h2o.config.cli.memory.per-node")
+ val nthreads = config.getInt("system.h2o.config.cli.parallelism.per-node")
+ val home = config.getString("system.h2o.path.home")
+
+ // check if tmp dir exists and create if not
+ try {
+
+ for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
+ logger.info(s"Initializing tmp directory $tmpDir at taskmanager node $dataNode")
+ shell ! s""" ssh $user@$dataNode "rm -Rf $tmpDir" """
+ shell ! s""" ssh $user@$dataNode "mkdir -p $tmpDir" """
+ }
+ } catch {
+ case _: ConfigException => // ignore not set explicitly, java default is taken
+ }
+
+ var failedStartUpAttempts = 0
+
+ while (!isUp) {
+ try {
+ val totl = config.getStringList("system.h2o.config.slaves").size()
+ val init = 0 // H2O doesn't reset the log on startup
+
+ for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
+ shell ! s""" ssh $user@$dataNode "java -Xmx$memory -cp $home/h2odriver.jar water.H2OApp -flatfile $home/flatfile -nthreads $nthreads -ice_root $tmpDir -port $dataPort > /dev/null &" """
+ }
+ logger.info("Waiting for nodes to connect")
+
+ var curr = init
+ var cntr = config.getInt("system.h2o.startup.polling.counter")
+ val pollingNode = config.getStringList("system.h2o.config.slaves").get(0)
+ while (curr - init < totl) {
+ logger.info(s"Connected ${curr - init} from $totl nodes")
+ // wait a bit
+ Thread.sleep(config.getInt("system.h2o.startup.polling.interval"))
+ // get new values
+ try {
+ curr = Integer.parseInt((shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Cloud.json | grep -Eo 'cloud_size":[0-9]+,' | grep -Eo '[0-9]+' """).stripLineEnd)
+ } catch {
+ case _ : Throwable => ;
+ }
+ // timeout if counter goes below zero
+ cntr = cntr - 1
+ if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ")
+ }
+ isUp = true
+ } catch {
+ case e: SetUpTimeoutException =>
+ failedStartUpAttempts = failedStartUpAttempts + 1
+ if (failedStartUpAttempts < config.getInt("system.h2o.startup.max.attempts")) {
+ stop()
+ logger.info(s"Could not bring system '$toString' up in time, trying again...")
+ } else {
+ throw e
+ }
+ }
+ }
+ }
+
+ override protected def stop() = {
+ val user = config.getString("system.h2o.user")
+ for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
+ shell ! s""" ssh $user@$dataNode "ps -ef | grep h2odriver.jar | grep -v grep | awk '{print \\$$2}' | xargs kill" """
+ }
+ isUp = false
+ }
+
+ def isRunning = {
+ val pollingNode = config.getStringList("system.h2o.config.slaves").get(0)
+ val user = config.getString("system.h2o.user")
+ (shell ! s""" ssh $user@$pollingNode "ps -ef | grep h2odriver.jar | grep -v grep " """) == 0
+ }
+}
\ No newline at end of file