Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added H2O extension #49

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ object Model {
val hosts = c.getStringList(key)
}

class HostsWithPort(val c: Config, val prefix: String) extends Model {
val hosts = (for (host <- c.getStringList(s"$prefix.slaves").asScala) yield s"$host:${c.getInt(s"$prefix.cli.dataport")}").asJava
}

def factory[T <: Model](config: Config, prefix: String)(implicit m: Manifest[T]) = {
val constructor = m.runtimeClass.getConstructor(classOf[Config], classOf[String])
constructor.newInstance(config, prefix)
Expand Down
6 changes: 6 additions & 0 deletions peel-extensions/src/main/resources/peel-extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@
<constructor-arg name="lifespan" value="EXPERIMENT"/>
</bean>

<!-- H2O -->
<bean id="h2o-3.0.0.12" class="eu.stratosphere.peel.extensions.h2o.beans.system.H2O" parent="system">
<constructor-arg name="version" value="3.0.0.12"/>
<constructor-arg name="lifespan" value="EXPERIMENT"/>
</bean>

</beans>
16 changes: 16 additions & 0 deletions peel-extensions/src/main/resources/reference.h2o-3.0.0.12.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# include common h2o configuration
include "reference.h2o.conf"

system {
h2o {
path {
#IMPORTANT: it has to be a pre-built version with HADOOP
#TODO: zip file is not supported
archive.url = "http://h2o-release.s3.amazonaws.com/h2o/rel-shannon/12/h2o-3.0.0.12-hdp2.2.zip"
archive.md5 = "297811A13E849D264F6223536D736B19"
archive.src = ${app.path.downloads}"/h2o-3.0.0.12-hdp2.2.zip"
home = ${system.h2o.path.archive.dst}"/h2o-3.0.0.12-hdp2.2"
}

}
}
26 changes: 26 additions & 0 deletions peel-extensions/src/main/resources/reference.h2o.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
system {
h2o {
user = ${system.default.user}
group = ${system.default.group}
path {
archive.dst = ${app.path.systems}
}
startup {
max.attempts = ${system.default.startup.max.attempts}
polling {
counter = ${system.default.startup.polling.counter}
interval = ${system.default.startup.polling.interval}
}
}
config {
# put list of slaves
slaves = ${system.default.config.slaves}
cli {
tmp.dir = "/tmp/h2o-"${system.default.user}
dataport = 55555
memory.per-node = 2g
parallelism.per-node = ${system.default.config.parallelism.per-node}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{#tmp.dir}}tmp.dir {{tmp.dir}}{{/tmp.dir}}
{{#dataport}}dataport {{dataport}}{{/dataport}}
{{#parallelism.per-node}}parallelism.per-node {{parallelism.per-node}}{{/parallelism.per-node}}
{{#memory.per-node}}memory.per-node {{memory.per-node}}{{/memory.per-node}}
{{#single-precision}}single-precision {{single-precision}}{{/single-precision}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{#hosts}}
{{{.}}}
{{/hosts}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package eu.stratosphere.peel.extensions.h2o.beans.experiment

import java.lang.{System => Sys}
import java.io.FileWriter
import java.nio.file.{Paths, Files}

import com.typesafe.config.Config
import eu.stratosphere.peel.core.beans.data.{ExperimentOutput, DataSet}
import eu.stratosphere.peel.core.beans.experiment.Experiment
import eu.stratosphere.peel.core.util.shell
import eu.stratosphere.peel.extensions.h2o.beans.system.H2O
import spray.json._

/** An [[eu.stratosphere.peel.core.beans.experiment.Experiment Experiment]] implementation which handles the execution
* of a single H2O job.
*
* Note:
* H2O currently doesn't support submitting a java/scala job in jar file.
* There are 2 ways to submit a h2o job:
* 1. through interactive web interface, only the pre-implemented algorithms are supported.
* 2. execute a python/R script. precondition: install the h2o python/R package
*
* So if you don't want to install the h2o python/R package, you have to write a java program which simulate all
* the actions on the web interface.
*/
class H2OExperiment(
command: String,
runner : H2O,
runs : Int,
inputs : Set[DataSet],
outputs: Set[ExperimentOutput],
name : String,
config : Config) extends Experiment(command, runner, runs, inputs, outputs, name, config) {

def this(
runs : Int,
runner : H2O,
input : DataSet,
output : ExperimentOutput,
command: String,
name : String,
config : Config) = this(command, runner, runs, Set(input), Set(output), name, config)

def this(
runs : Int,
runner : H2O,
inputs : Set[DataSet],
output : ExperimentOutput,
command: String,
name : String,
config : Config) = this(command, runner, runs, inputs, Set(output), name, config)

override def run(id: Int, force: Boolean): Experiment.Run[H2O] = new H2OExperiment.SingleJobRun(id, this, force)

def copy(name: String = name, config: Config = config) = new H2OExperiment(command, runner, runs, inputs, outputs, name, config)

}

object H2OExperiment {

case class State(
name: String,
suiteName: String,
command: String,
runnerID: String,
runnerName: String,
runnerVersion: String,
var runExitCode: Option[Int] = None,
var runTime: Long = 0) extends Experiment.RunState {}

object StateProtocol extends DefaultJsonProtocol with NullOptions {
implicit val stateFormat = jsonFormat8(State)
}

/** A private inner class encapsulating the logic of single run. */
class SingleJobRun(val id: Int, val exp: H2OExperiment, val force: Boolean) extends Experiment.SingleJobRun[H2O, State] {

import eu.stratosphere.peel.extensions.h2o.beans.experiment.H2OExperiment.StateProtocol._

val runnerLogPath = s"${exp.config.getString("system.h2o.config.cli.tmp.dir")}/h2ologs"

override def isSuccessful = state.runExitCode.getOrElse(-1) == 0

override protected def logFilePatterns = List(s"$runnerLogPath/h2o_*-3-info.log")

val pollingNode = exp.config.getStringList("system.h2o.config.slaves").get(0)
val user = exp.config.getString("system.h2o.user")
val dataPort = exp.config.getInt("system.h2o.config.cli.dataport")

override protected def loadState(): State = {
if (Files.isRegularFile(Paths.get(s"$home/state.json"))) {
try {
io.Source.fromFile(s"$home/state.json").mkString.parseJson.convertTo[State]
} catch {
case e: Throwable => State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version)
}
} else {
State(name, Sys.getProperty("app.suite.name"), command, exp.runner.beanName, exp.runner.name, exp.runner.version)
}
}

override protected def writeState() = {
val fw = new FileWriter(s"$home/state.json")
fw.write(state.toJson.prettyPrint)
fw.close()
}

override protected def runJob() = {
// try to execute the experiment
val (runExit, t) = Experiment.time(this ! (command, s"$home/run.out", s"$home/run.err"))
state.runTime = t
state.runExitCode = Some(runExit)
}

/** Before the run, collect runner log files and their current line counts */
override protected def beforeRun() = {
val logFiles = for (pattern <- logFilePatterns; f <- (shell !! s""" ssh $user@$pollingNode "ls $pattern" """).split(Sys.lineSeparator).map(_.trim)) yield f
logFileCounts = Map((for (f <- logFiles) yield f -> (shell !! s""" ssh $user@$pollingNode "wc -l $f | cut -d' ' -f1" """).trim.toLong): _*)
}

/** After the run, copy logs */
override protected def afterRun(): Unit = {
shell ! s"rm -Rf $home/logs/*"
for ((file, count) <- logFileCounts) shell ! s""" ssh $user@$pollingNode "tail -n +${count + 1} $file" > $home/logs/${Paths.get(file).getFileName}"""
}

override def cancelJob() = {
//firstly, retrieve the jobid of running job
val s = shell !! s"wget -qO- $user@$pollingNode:$dataPort/3/Jobs"
var jobid = ""
for (job <- s.parseJson.asJsObject.fields.get("jobs").get.asInstanceOf[JsArray].elements) {
val fields = job.asJsObject.fields
if (fields.getOrElse("status", "").toString == "\"RUNNING\"" || fields.getOrElse("status", "").toString == "\"CREATED\"") {
val sid = fields.get("key").get.asInstanceOf[JsObject].fields.get("name").get.toString()
jobid = sid.substring(1, jobid.length - 1)
}
}

//send cancel request to REST API
shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")}/cancel --post-data="job_id=${jobid}" """

//check if the job has been successfully cancelled
var isCancelled = false
while (!isCancelled) {
Thread.sleep(exp.config.getInt("system.h2o.startup.polling.interval") * 2)
val status = this getStatus(jobid)
isCancelled = status == "CANCELLED"
}
state.runTime = exp.config.getLong("experiment.timeout") * 1000
state.runExitCode = Some(-1)
}

private def !(command: String, outFile: String, errFile: String) = {
shell ! s"$command > $outFile 2> $errFile"
}

private def getStatus(jobid: String): String = {
(shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Jobs/${jobid.replace("$", "\\$")} | grep -Eo '"status":"[A-Z]+"' | grep -Eo [A-Z]+ """).stripLineEnd
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package eu.stratosphere.peel.extensions.h2o.beans.system

import com.samskivert.mustache.Mustache
import com.typesafe.config.ConfigException
import eu.stratosphere.peel.core.beans.system.Lifespan._
import eu.stratosphere.peel.core.beans.system.{SetUpTimeoutException, System}
import eu.stratosphere.peel.core.config.{Model, SystemConfig}
import eu.stratosphere.peel.core.util.shell

import scala.collection.JavaConverters._

/** Wrapper for H2O
*
* Implements H2O as a [[eu.stratosphere.peel.core.beans.system.System System]] class and provides setup and teardown methods.
*
* @param version Version of the system (e.g. "2.8.6")
* @param lifespan [[eu.stratosphere.peel.core.beans.system.Lifespan Lifespan]] of the system
* @param dependencies Set of dependencies that this system needs
* @param mc The moustache compiler to compile the templates that are used to generate property files for the system
*
*
*/
class H2O(version: String, lifespan: Lifespan, dependencies: Set[System] = Set(), mc: Mustache.Compiler) extends System("h2o", version, lifespan, dependencies, mc) {

override def configuration() = SystemConfig(config, {
val home = config.getString("system.h2o.path.home")
List(
SystemConfig.Entry[Model.HostsWithPort]("system.h2o.config", s"$home/flatfile", templatePath("conf/hosts"), mc),
SystemConfig.Entry[Model.Yaml]("system.h2o.config.cli", s"$home/conf", templatePath("conf/h2o-conf"), mc)
)
})

override protected def start(): Unit = {
val user = config.getString("system.h2o.user")
val tmpDir =config.getString("system.h2o.config.cli.tmp.dir")
val dataPort = config.getInt("system.h2o.config.cli.dataport")
val memory = config.getString("system.h2o.config.cli.memory.per-node")
val nthreads = config.getInt("system.h2o.config.cli.parallelism.per-node")
val home = config.getString("system.h2o.path.home")

// check if tmp dir exists and create if not
try {

for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
logger.info(s"Initializing tmp directory $tmpDir at taskmanager node $dataNode")
shell ! s""" ssh $user@$dataNode "rm -Rf $tmpDir" """
shell ! s""" ssh $user@$dataNode "mkdir -p $tmpDir" """
}
} catch {
case _: ConfigException => // ignore not set explicitly, java default is taken
}

var failedStartUpAttempts = 0

while (!isUp) {
try {
val totl = config.getStringList("system.h2o.config.slaves").size()
val init = 0 // H2O doesn't reset the log on startup

for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
shell ! s""" ssh $user@$dataNode "java -Xmx$memory -cp $home/h2odriver.jar water.H2OApp -flatfile $home/flatfile -nthreads $nthreads -ice_root $tmpDir -port $dataPort > /dev/null &" """
}
logger.info("Waiting for nodes to connect")

var curr = init
var cntr = config.getInt("system.h2o.startup.polling.counter")
val pollingNode = config.getStringList("system.h2o.config.slaves").get(0)
while (curr - init < totl) {
logger.info(s"Connected ${curr - init} from $totl nodes")
// wait a bit
Thread.sleep(config.getInt("system.h2o.startup.polling.interval"))
// get new values
try {
curr = Integer.parseInt((shell !! s""" wget -qO- $user@$pollingNode:$dataPort/3/Cloud.json | grep -Eo 'cloud_size":[0-9]+,' | grep -Eo '[0-9]+' """).stripLineEnd)
} catch {
case _ : Throwable => ;
}
// timeout if counter goes below zero
cntr = cntr - 1
if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ")
}
isUp = true
} catch {
case e: SetUpTimeoutException =>
failedStartUpAttempts = failedStartUpAttempts + 1
if (failedStartUpAttempts < config.getInt("system.h2o.startup.max.attempts")) {
stop()
logger.info(s"Could not bring system '$toString' up in time, trying again...")
} else {
throw e
}
}
}
}

override protected def stop() = {
val user = config.getString("system.h2o.user")
for (dataNode <- config.getStringList("system.h2o.config.slaves").asScala) {
shell ! s""" ssh $user@$dataNode "ps -ef | grep h2odriver.jar | grep -v grep | awk '{print \\$$2}' | xargs kill" """
}
isUp = false
}

def isRunning = {
val pollingNode = config.getStringList("system.h2o.config.slaves").get(0)
val user = config.getString("system.h2o.user")
(shell ! s""" ssh $user@$pollingNode "ps -ef | grep h2odriver.jar | grep -v grep " """) == 0
}
}