Skip to content

Commit

Permalink
No back pressure in NuProcess
Browse files Browse the repository at this point in the history
I had previously thought that NuProcess handled back-pressure. As per brettwooldridge/NuProcess#77, it doesn't appear so. Hence, I'm reverting the code to use a Source.queue and managing a 64KiB buffer directly.

If NuProcess ever does handle back-pressure then the PublishIfAvailable source could make a come-back. It was working beautifully. :-)
  • Loading branch information
huntc committed Aug 21, 2017
1 parent 912d2e2 commit 550c28e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 96 deletions.
7 changes: 7 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ akka {
non-blocking-process {
# The process will be inspected at this interval to ensure it is still alive.
inspection-interval = 1 second

# The following settings equate to the size of the stdout/err buffer when
# consuming from the underlying process manager.
stdout-max-bytes-per-line = 1024
stdout-buffer-max-lines = 64
stderr-max-bytes-per-line = 1024
stderr-buffer-max-lines = 64
}

blocking-process {
Expand Down
131 changes: 35 additions & 96 deletions src/main/scala/akka/contrib/process/NonBlockingProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
package akka.contrib.process

import akka.actor.{ Actor, ActorLogging, NoSerializationVerificationNeeded, Props }
import akka.stream.scaladsl.{ BroadcastHub, FileIO, Keep, Sink, Source }
import akka.stream.scaladsl.{ BroadcastHub, FileIO, Keep, Sink, Source, SourceQueueWithComplete }
import akka.util.ByteString
import java.nio.ByteBuffer
import java.nio.file.{ Files, Path, Paths }
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock

import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
import akka.stream._
import akka.{ Done, NotUsed }
import com.zaxxer.nuprocess.{ NuAbstractProcessHandler, NuProcess, NuProcessBuilder }
Expand All @@ -25,18 +22,13 @@ import scala.concurrent.{ ExecutionContext, Future }

object NonBlockingProcess {

/**
* The configuration key to use for the inspection interval.
*/
final val InspectionInterval = "akka.process.non-blocking-process.inspection-interval"

/**
* Sent to the receiver on startup - specifies the streams used for managing input, output and error respectively.
* This message should only be received by the parent of the NonBlockingProcess and should not be passed across the
* JVM boundary (the publishers are not serializable).
*
* @param pid the process id
* @param stdin a `akka.stream.scaladsl.Sink[ByteString]` for the standard input stream of the process
* @param pid the process id
* @param stdin a `akka.stream.scaladsl.Sink[ByteString]` for the standard input stream of the process
* @param stdout a `akka.stream.scaladsl.Source[ByteString]` for the standard output stream of the process
* @param stderr a `akka.stream.scaladsl.Source[ByteString]` for the standard error stream of the process
*/
Expand Down Expand Up @@ -68,8 +60,8 @@ object NonBlockingProcess {
/**
* Create Props for a [[NonBlockingProcess]] actor.
*
* @param command signifies the program to be executed and its optional arguments
* @param workingDir the working directory for the process; default is the current working directory
* @param command signifies the program to be executed and its optional arguments
* @param workingDir the working directory for the process; default is the current working directory
* @param environment the environment for the process; default is `Map.emtpy`
* @return Props for a [[NonBlockingProcess]] actor
*/
Expand All @@ -79,16 +71,12 @@ object NonBlockingProcess {
environment: Map[String, String] = Map.empty) =
Props(new NonBlockingProcess(command, workingDir, environment))

private[process] object PublishIfAvailableSideChannel {
sealed trait AsyncEvents
case class Publish[T](e: T) extends AsyncEvents
case class Complete[T](e: Option[T]) extends AsyncEvents
}
// For additional process detection for platforms that support "/proc"
private[process] val procDir = Paths.get("/proc")
private[process] val hasProcDir = Files.exists(procDir)

private[process] abstract class PublishIfAvailableSideChannel[T] {
def publishIfAvailable(e: () => T): Unit
def complete(e: Option[T]): Unit
}
// For additional checking on whether a process is alive
private[process] case object Inspect

/**
* Determines if a process is still alive according to its entry in /proc/{pid}/stat. A process is considered
Expand All @@ -109,70 +97,16 @@ object NonBlockingProcess {
case _ => false
}

/*
* The motivation for this type of source is to publish *only* if any downstream
* is ready to receive. The assumption is that only one thread will be calling
* the publishIfAvailable function of the side channel at any one time.
*/
private[process] class PublishIfAvailable[T]
extends GraphStageWithMaterializedValue[SourceShape[T], PublishIfAvailableSideChannel[T]] {

private val out = Outlet[T]("PublishIfAvailable.out")
override def shape = SourceShape(out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, PublishIfAvailableSideChannel[T]) = {
val asyncCallback = new AtomicReference[AsyncCallback[PublishIfAvailableSideChannel.AsyncEvents]]
var downstreamReady = false
val downstreamReadyLock = new ReentrantLock
val logic = new GraphStageLogic(shape) {

override def preStart(): Unit =
asyncCallback.set(getAsyncCallback[PublishIfAvailableSideChannel.AsyncEvents] {
case PublishIfAvailableSideChannel.Publish(e: T @unchecked) =>
push(out, e)
case PublishIfAvailableSideChannel.Complete(e: Option[T] @unchecked) =>
e.foreach(emit(out, _))
completeStage()
})

setHandler(out, new OutHandler {
override def onPull(): Unit = {
downstreamReadyLock.lock()
try {
downstreamReady = true
} finally {
downstreamReadyLock.unlock()
}
}
})

}

logic -> new PublishIfAvailableSideChannel[T] {
override def publishIfAvailable(e: () => T): Unit =
Option(asyncCallback.get).foreach { ac =>
downstreamReadyLock.lock()
try {
if (downstreamReady) {
ac.invoke(PublishIfAvailableSideChannel.Publish(e()))
downstreamReady = false
}
} finally {
downstreamReadyLock.unlock()
}
}

override def complete(e: Option[T]): Unit =
Option(asyncCallback.get).foreach(_.invoke(PublishIfAvailableSideChannel.Complete(e)))
}
private[process] def enqueue(queue: SourceQueueWithComplete[ByteString], buffer: ByteBuffer, maxBytes: Int, closed: Boolean): Unit = {
while (buffer.hasRemaining) {
val len = Math.min(buffer.remaining(), maxBytes)
val arr = new Array[Byte](len)
buffer.get(arr, 0, len)
val byteString = ByteString.ByteString1C(arr)
queue.offer(byteString)
}
if (closed) queue.complete()
}

// For additional process detection for platforms that support "/proc"
private[process] val procDir = Paths.get("/proc")
private[process] val hasProcDir = Files.exists(procDir)
// For additional checking on whether a process is alive
private[process] case object Inspect
}

/**
Expand All @@ -195,8 +129,20 @@ class NonBlockingProcess(
import NonBlockingProcess._
import context.dispatcher

private val stdoutMaxBytesPerLine =
context.system.settings.config.getInt("akka.process.non-blocking-process.stdout-max-bytes-per-line")
private val stdoutBufferMaxLines =
context.system.settings.config.getInt("akka.process.non-blocking-process.stdout-buffer-max-lines")
private val stderrMaxBytesPerLine =
context.system.settings.config.getInt("akka.process.non-blocking-process.stderr-max-bytes-per-line")
private val stderrBufferMaxLines =
context.system.settings.config.getInt("akka.process.non-blocking-process.stderr-buffer-max-lines")

private val inspectionInterval =
Duration(context.system.settings.config.getDuration(InspectionInterval).toMillis, TimeUnit.MILLISECONDS)
Duration(
context.system.settings.config.getDuration("akka.process.non-blocking-process.inspection-interval").toMillis,
TimeUnit.MILLISECONDS
)

private val inspectionTick =
context.system.scheduler.schedule(inspectionInterval, inspectionInterval, self, Inspect)
Expand All @@ -223,34 +169,27 @@ class NonBlockingProcess(
.foreach[ByteString](bytes => nuProcess.writeStdin(bytes.toByteBuffer))
val (out, stdout) =
Source
.fromGraph(new PublishIfAvailable[ByteString])
.queue[ByteString](stdoutBufferMaxLines, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run
val (err, stderr) =
Source
.fromGraph(new PublishIfAvailable[ByteString])
.queue[ByteString](stderrBufferMaxLines, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run

// FIXME: if we don't consume from stdout/stderr then we know that NuProcess will spin the CPU - see https://github.com/brettwooldridge/NuProcess/issues/53
nuProcess.setProcessHandler(new NuAbstractProcessHandler {
override def onStart(nuProcess: NuProcess): Unit =
self ! Started(nuProcess.getPID, stdin, stdout, stderr)

override def onStderr(buffer: ByteBuffer, closed: Boolean): Unit =
if (!closed)
err.publishIfAvailable(() => ByteString.fromByteBuffer(buffer))
else
err.complete(if (buffer.hasRemaining) Some(ByteString.fromByteBuffer(buffer)) else None)
enqueue(err, buffer, stderrMaxBytesPerLine, closed)

override def onExit(exitCode: Int): Unit =
self ! Exited(exitCode)

override def onStdout(buffer: ByteBuffer, closed: Boolean): Unit =
if (!closed)
out.publishIfAvailable(() => ByteString.fromByteBuffer(buffer))
else
out.complete(if (buffer.hasRemaining) Some(ByteString.fromByteBuffer(buffer)) else None)
enqueue(out, buffer, stdoutMaxBytesPerLine, closed)
})
}
})
Expand Down

0 comments on commit 550c28e

Please sign in to comment.