Skip to content

Commit

Permalink
cleanup; adding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Mar 28, 2019
1 parent 1564ee8 commit ef31370
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,21 @@ object LoggingMarkers {
LogMarkerToken(containerPool, "resourceError", counter)(MeasurementUnit.none)
val CONTAINER_POOL_RESCHEDULED_ACTIVATION =
LogMarkerToken(containerPool, "rescheduledActivation", counter)(MeasurementUnit.none)
val CONTAINER_POOL_RUNBUFFER_SIZE =
LogMarkerToken(containerPool, "runBufferSize", counter)(MeasurementUnit.none)
val CONTAINER_POOL_ACTIVE_COUNT =
LogMarkerToken(containerPool, "activeCount", counter)(MeasurementUnit.none)
val CONTAINER_POOL_ACTIVE_SIZE =
LogMarkerToken(containerPool, "activeSize", counter)(MeasurementUnit.none)
val CLUSTER_RESOURCES_IDLES_COUNT =
LogMarkerToken(clusterResourceManager, "idlesCount", counter)(MeasurementUnit.none)
val CLUSTER_RESOURCES_IDLES_SIZE =
LogMarkerToken(clusterResourceManager, "idlesSize", counter)(MeasurementUnit.none)
val CLUSTER_RESOURCES_RESERVED_COUNT =
LogMarkerToken(clusterResourceManager, "reservedCount", counter)(MeasurementUnit.none)
val CLUSTER_RESOURCES_RESERVED_SIZE =
LogMarkerToken(clusterResourceManager, "reservedSize", counter)(MeasurementUnit.none)

val CLUSTER_RESOURCES_TOTAL_MEM =
LogMarkerToken(clusterResourceManager, "totalMemory", counter)(MeasurementUnit.none)
val CLUSTER_RESOURCES_MAX_MEM =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
poolActor: ActorRef,
poolConfig: ContainerPoolConfig)(implicit logging: Logging)
extends ContainerResourceManager {
override val autoStartPrewarming: Boolean = false

/** cluster state tracking */
private var localReservations: Map[ActorRef, Reservation] = Map.empty //this pool's own reservations
Expand Down Expand Up @@ -89,16 +90,14 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
//cachedValues
var idMap: immutable.Set[Int] = Set.empty

def activationStartLogMessage(): String =
s"node stats ${clusterActionHostStats} reserved ${localReservations.size} (of max ${poolConfig.clusterManagedResourceMaxStarts}) containers ${reservedSize}MB " +
s"${reservedStartCount} pending starts ${reservedStopCount} pending stops " +
s"${scheduledStartCount} scheduled starts ${scheduledStopCount} scheduled stops"
override def activationStartLogMessage(): String =
s"node stats ${clusterActionHostStats} reserved ${localReservations.size} (of max ${poolConfig.clusterManagedResourceMaxStarts}) containers ${reservedSize}MB"

def rescheduleLogMessage() = {
override def rescheduleLogMessage() = {
s"reservations: ${localReservations.size}"
}

def requestSpace(size: ByteSize) = {
override def requestSpace(size: ByteSize) = {
val bufferedSize = size * 10 //request 10x the required space to allow for failures and additional traffic
//find idles up to this size
val removable = remoteUnused.map(u => u._2.map(u._1 -> _)).flatten.to[ListBuffer]
Expand All @@ -118,22 +117,6 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,

def reservedSize = localReservations.values.map(_.size.toMB).sum
def remoteReservedSize = remoteReservations.values.map(_.map(_.size.toMB).sum).sum
def reservedStartCount = localReservations.values.count {
case p: Reservation => p.size.toMB >= 0
case _ => false
}
def reservedStopCount = localReservations.values.count {
case p: Reservation => p.size.toMB < 0
case _ => false
}
def scheduledStartCount = localReservations.values.count {
case p: Reservation => p.size.toMB >= 0
case _ => false
}
def scheduledStopCount = localReservations.values.count {
case p: Reservation => p.size.toMB < 0
case _ => false
}
private var clusterResourcesAvailable
: Boolean = false //track to log whenever there is a switch from cluster resources being available to not being available

Expand Down Expand Up @@ -178,13 +161,13 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
}

/** reservation adjustments */
def addReservation(ref: ActorRef, size: ByteSize): Unit = {
override def addReservation(ref: ActorRef, size: ByteSize): Unit = {
localReservations = localReservations + (ref -> Reservation(size))
}
def releaseReservation(ref: ActorRef): Unit = {
override def releaseReservation(ref: ActorRef): Unit = {
localReservations = localReservations - ref
}
def allowMoreStarts(config: ContainerPoolConfig): Boolean =
override def allowMoreStarts(config: ContainerPoolConfig): Boolean =
localReservations
.count({ case (_, state) => state.size.toMB > 0 }) < config.clusterManagedResourceMaxStarts //only positive reservations affect ability to start

Expand Down Expand Up @@ -273,6 +256,14 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
stats.values.maxBy(_.mem).mem.toLong)
}
MetricEmitter.emitHistogramMetric(LoggingMarkers.CLUSTER_RESOURCES_NODE_COUNT, stats.size)
MetricEmitter.emitHistogramMetric(LoggingMarkers.CLUSTER_RESOURCES_IDLES_COUNT, localUnused.size)
MetricEmitter.emitHistogramMetric(
LoggingMarkers.CLUSTER_RESOURCES_IDLES_SIZE,
localUnused.map(_._2.memoryLimit.toMB).sum)
MetricEmitter.emitHistogramMetric(LoggingMarkers.CLUSTER_RESOURCES_RESERVED_COUNT, localReservations.size)
MetricEmitter.emitHistogramMetric(
LoggingMarkers.CLUSTER_RESOURCES_RESERVED_SIZE,
localReservations.map(_._2.size.toMB).sum)

if (stats.nonEmpty && !prewarmsInitialized) { //we assume that when stats are received, we should startup prewarm containers
prewarmsInitialized = true
Expand Down Expand Up @@ -350,7 +341,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
}
}
}
def updateUnused(newUnused: Map[ActorRef, ContainerData]): Unit = {
override def updateUnused(newUnused: Map[ActorRef, ContainerData]): Unit = {
localUnused = newUnused
}
def remove[A](pool: ListBuffer[(A, RemoteContainerRef)], memory: ByteSize): List[(A, RemoteContainerRef)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ case class WorkerData(data: ContainerData, state: WorkerState)

case object InitPrewarms
case object ResourceUpdate
case object EmitMetrics

/**
* A pool managing containers to run actions on.
Expand All @@ -56,22 +57,27 @@ case object ResourceUpdate
* @param feed actor to request more work from
* @param prewarmConfig optional settings for container prewarming
* @param poolConfig config for the ContainerPool
* @param resMgr ContainerResourceManager impl
*/
class ContainerPool(instanceId: InvokerInstanceId,
childFactory: ActorRefFactory => ActorRef,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty,
poolConfig: ContainerPoolConfig)
poolConfig: ContainerPoolConfig,
resMgr: Option[ContainerResourceManager])
extends Actor {
import ContainerPool.memoryConsumptionOf

implicit val logging = new AkkaLogging(context.system.log)
implicit val ec = context.dispatcher

val resourceManager = if (poolConfig.clusterManagedResources) {
val resourceManager = resMgr.getOrElse(if (poolConfig.clusterManagedResources) {
new AkkaClusterContainerResourceManager(context.system, instanceId, self, poolConfig)
} else {
self ! InitPrewarms
new LocalContainerResourceManager()
})
if (resourceManager.autoStartPrewarming) {
self ! InitPrewarms
}
var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
Expand All @@ -80,6 +86,8 @@ class ContainerPool(instanceId: InvokerInstanceId,
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
var runBuffer = immutable.Queue.empty[Run]
//periodically emit metrics (don't need to do this for each message!)
context.system.scheduler.schedule(30.seconds, 2.seconds, self, EmitMetrics)
var resent = immutable.Set.empty[ActivationId]
val logMessageInterval = 10.seconds

Expand All @@ -95,6 +103,7 @@ class ContainerPool(instanceId: InvokerInstanceId,
}
}

def inUse = freePool.filter(_._2.activeActivationCount > 0) ++ busyPool
def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
val actionName = r.action.name.name
Expand Down Expand Up @@ -334,6 +343,12 @@ class ContainerPool(instanceId: InvokerInstanceId,
case None =>
logging.info(this, s"Requested container removal ${r} failed because it is in use.")
})
case EmitMetrics =>
MetricEmitter.emitHistogramMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE, runBuffer.size)
MetricEmitter.emitHistogramMetric(LoggingMarkers.CLUSTER_RESOURCES_IDLES_COUNT, inUse.size)
MetricEmitter.emitHistogramMetric(
LoggingMarkers.CLUSTER_RESOURCES_IDLES_SIZE,
inUse.map(_._2.memoryLimit.toMB).sum)
}

/** Buffer processing in cluster managed resources means to send the first item in runBuffer;
Expand Down Expand Up @@ -536,8 +551,9 @@ object ContainerPool {
factory: ActorRefFactory => ActorRef,
poolConfig: ContainerPoolConfig,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty) =
Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig))
prewarmConfig: List[PrewarmingConfig] = List.empty,
resMgr: Option[ContainerResourceManager] = None) =
Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig, resMgr))
}

/** Contains settings needed to perform container prewarming. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import akka.actor.ActorRef
import org.apache.openwhisk.core.entity.ByteSize

trait ContainerResourceManager {
def activationStartLogMessage(): String
def rescheduleLogMessage(): String
def activationStartLogMessage(): String = ""
def rescheduleLogMessage(): String = ""

def updateUnused(unused: Map[ActorRef, ContainerData])
def allowMoreStarts(config: ContainerPoolConfig): Boolean
def addReservation(ref: ActorRef, byteSize: ByteSize): Unit
def releaseReservation(ref: ActorRef): Unit
def requestSpace(size: ByteSize): Unit
val autoStartPrewarming: Boolean = true
def updateUnused(unused: Map[ActorRef, ContainerData]) = {}
def allowMoreStarts(config: ContainerPoolConfig): Boolean = true
def addReservation(ref: ActorRef, byteSize: ByteSize): Unit = {}
def releaseReservation(ref: ActorRef): Unit = {}
def requestSpace(size: ByteSize): Unit = {}
def canLaunch(size: ByteSize, poolMemory: Long, poolConfig: ContainerPoolConfig): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@
*/

package org.apache.openwhisk.core.containerpool
import akka.actor.ActorRef
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.entity.ByteSize

class LocalContainerResourceManager(implicit logging: Logging) extends ContainerResourceManager {

override def activationStartLogMessage(): String = ""
override def rescheduleLogMessage(): String = ""
override def updateUnused(unused: Map[ActorRef, ContainerData]): Unit = {}
override def allowMoreStarts(config: ContainerPoolConfig): Boolean = true
override def addReservation(ref: ActorRef, byteSize: ByteSize): Unit = {}
override def releaseReservation(ref: ActorRef): Unit = {}
override def requestSpace(size: ByteSize): Unit = {}
override def canLaunch(size: ByteSize, poolMemory: Long, poolConfig: ContainerPoolConfig): Boolean =
poolMemory + size.toMB <= poolConfig.userMemory.toMB
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class ContainerPoolTests
(containers, factory)
}

def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, false, false, 10)
def poolConfig(userMemory: ByteSize, clusterMangedResources: Boolean = false) =
ContainerPoolConfig(userMemory, 0.5, false, clusterMangedResources, false, 10)

val instanceId = InvokerInstanceId(0, userMemory = 1024.MB)
behavior of "ContainerPool"
Expand Down Expand Up @@ -571,6 +571,83 @@ class ContainerPoolTests
pool ! runMessageConcurrent
containers(0).expectMsg(runMessageConcurrent)
}

it should "init prewarms only when InitPrewarms message is sent, when ContainerResourceManager.autoStartPrewarming is false" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val resMgr = new ContainerResourceManager {
override val autoStartPrewarming: Boolean = false
override def canLaunch(size: ByteSize, poolMemory: Long, poolConfig: ContainerPoolConfig): Boolean = true
}

val pool = system.actorOf(
ContainerPool
.props(
instanceId,
factory,
poolConfig(MemoryLimit.stdMemory),
feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit)),
Some(resMgr)))
//prewarms are not started immediately
containers(0).expectNoMessage
//prewarms must be started explicitly (e.g. by the ContainerResourceManager)
pool ! InitPrewarms

containers(0).expectMsg(Start(exec, memoryLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
pool ! runMessage
containers(0).expectMsg(runMessage)

}

it should "limit the number of container cold/prewarm starts" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val resMgr = new ContainerResourceManager {
override val autoStartPrewarming: Boolean = false
override def canLaunch(size: ByteSize, poolMemory: Long, poolConfig: ContainerPoolConfig): Boolean = true
}

val pool = system.actorOf(
ContainerPool
.props(
instanceId,
factory,
poolConfig(MemoryLimit.stdMemory),
feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit)),
Some(resMgr)))
//prewarms are not started immediately
containers(0).expectNoMessage
//prewarms must be started explicitly (e.g. by the ContainerResourceManager)
pool ! InitPrewarms

containers(0).expectMsg(Start(exec, memoryLimit)) // container0 was prewarmed

}

it should "request space from cluster if no resources available" in {}

it should "update unused when container reaches capacity" in {}

it should "track resent messages to avoid duplicated resends" in {}

it should "update unused on NeedWork" in {}

it should "process runbuffer when container is removed" in {}
it should "process runbuffer on ResourceUpdate" in {}
it should "release reservation on ContainerStarted" in {}
it should "request space from cluster on NeedResources" in {}
it should "remove unused on ReleaseFree" in {}
it should "process runbuffer instead of requesting new messages" in {}
it should "add reservation in createContainer" in {}
it should "add reservation in prewarmContainer" in {}

it should "delegate to resourceManager to determine whether there is space to launch" in {}

it should "remove any amount of space when freeing unused resources in cluster managed case" in {}

}

/**
Expand Down

0 comments on commit ef31370

Please sign in to comment.