Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Mar 5, 2019
1 parent 4f6d265 commit 9d7a1a0
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 39 deletions.
4 changes: 2 additions & 2 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ whisk {
}

container-pool {
user-memory: 9182 m
user-memory: 1024 m
concurrent-peek-factor: 0.5 # Factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # If true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerFactory.canLaunch() is used
cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerPool.clusterCanLaunch() is used
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import java.time.Instant
import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
Expand Down Expand Up @@ -112,6 +111,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
val actionName = r.action.name.name
val maxConcurrent = r.action.limits.concurrency.maxConcurrent
val activationId = r.msg.activationId.toString

r.msg.transid.mark(
this,
LoggingMarkers.INVOKER_CONTAINER_START(containerState),
Expand Down Expand Up @@ -416,7 +416,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
def hasPoolSpaceFor[A](poolConfig: ContainerPoolConfig, pool: Map[A, ContainerData], memory: ByteSize)(
implicit tid: TransactionId): Boolean = {
if (poolConfig.clusterManagedResources) {
canLaunch(memory)
clusterCanLaunch(memory)
} else {
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
}
Expand All @@ -440,14 +440,14 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case _ => false
}

private var resourcesAvailable
: Boolean = false //track whenever there is a switch from cluster resources being available to not being available
private var clusterResourcesAvailable
: Boolean = false //track to log whenever there is a switch from cluster resources being available to not being available

def canLaunch(memory: ByteSize)(implicit tid: TransactionId): Boolean = {
def clusterCanLaunch(memory: ByteSize)(implicit tid: TransactionId): Boolean = {
//make sure there is at least one node with unreerved mem > memory
val canLaunch = hasPotentialMemoryCapacity(memory.toMB, clusterReservations.values.map(_.size).toList) //consider all reservations blocking till they are removed during NodeStatsUpdate
val canLaunch = clusterHasPotentialMemoryCapacity(memory.toMB, clusterReservations.values.map(_.size).toList) //consider all reservations blocking till they are removed during NodeStatsUpdate
//log only when changing value
if (canLaunch != resourcesAvailable) {
if (canLaunch != clusterResourcesAvailable) {
if (canLaunch) {
logging.info(
this,
Expand All @@ -458,12 +458,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
s"cluster cannot launch action with ${memory.toMB}MB reserved:${reservedSize} freepool: ${memoryConsumptionOf(freePool)}")
}
}
resourcesAvailable = canLaunch
clusterResourcesAvailable = canLaunch
canLaunch
}

/** Return true to indicate there is expectation that there is "room" to launch a task with these memory/cpu/ports specs */
def hasPotentialMemoryCapacity(memory: Double, reserve: List[Long]): Boolean = {
def clusterHasPotentialMemoryCapacity(memory: Double, reserve: List[Long]): Boolean = {
//copy AgentStats, then deduct pending tasks
var availableResources = clusterActionHostStats.toList.sortBy(_._2.mem).toMap //sort by mem to match lowest value
val inNeedReserved = ListBuffer.empty ++ reserve
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ case object RunCompleted
case object ContainerStarted

/**
* A proxy that wraps a C ontainer. It is used to keep track of the lifecycle
* A proxy that wraps a Container. It is used to keep track of the lifecycle
* of a container and to guarantee a contract between the client of the container
* and the container itself.
*
Expand Down Expand Up @@ -309,7 +309,6 @@ class ContainerProxy(
when(Starting) {
// container was successfully obtained
case Event(completed: PreWarmCompleted, _) =>
//context.parent ! ContainerStarted
context.parent ! NeedWork(completed.data)
goto(Started) using completed.data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ class InvokerReactive(
"--ulimit" -> Set("nofile=1024:1024"),
"--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
containerFactory.init()
//sys.addShutdownHook(containerFactory.cleanup())
//TODO: separate PR for changing to CoordinatedShutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () =>

CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "invokerCleanup") { () =>
containerFactory.cleanup()
Future.successful(Done)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,15 @@ import akka.actor.ActorSystem
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
import common.StreamLogging
import common.WhiskProperties
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
import scala.concurrent.Future

/**
* Behavior tests for the ContainerPool
Expand All @@ -58,7 +54,6 @@ class ContainerPoolTests
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with StreamLogging
with MockFactory {

override def afterAll = TestKit.shutdownActorSystem(system)
Expand Down Expand Up @@ -814,21 +809,4 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('second)
}

class TestContainerFactory extends ContainerFactory {

/** create a new Container */
override def createContainer(tid: TransactionId,
name: String,
actionImage: ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] =
???

/** perform any initialization */
override def init(): Unit = ???

/** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */
override def cleanup(): Unit = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class ContainerProxyTests
/** Expect a NeedWork message with warmed data */
def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
val test = EntityName(namespace)
//expectMsg(ContainerStarted)
expectMsgPF() {
case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) => //matched, otherwise will fail
}
Expand Down

0 comments on commit 9d7a1a0

Please sign in to comment.