From 1564ee8228e1b6a25c25a5bdbbc889889792fdc2 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Thu, 21 Mar 2019 22:08:51 -0700 Subject: [PATCH] cleanup --- .../openwhisk/core/containerpool/ContainerFactory.scala | 9 ++------- core/invoker/src/main/resources/application.conf | 6 +----- .../AkkaClusterContainerResourceManager.scala | 8 ++++++++ .../openwhisk/core/containerpool/ContainerPool.scala | 8 ++------ .../mesos/test/MesosContainerFactoryTest.scala | 3 +-- .../core/containerpool/test/ContainerPoolTests.scala | 3 +-- .../core/containerpool/test/ContainerProxyTests.scala | 3 +-- 7 files changed, 16 insertions(+), 24 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala index ff5198423b0..95303c5619b 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala @@ -23,7 +23,6 @@ import org.apache.openwhisk.core.WhiskConfig import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId} import org.apache.openwhisk.spi.Spi import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, @@ -31,16 +30,12 @@ case class ContainerArgsConfig(network: String, dnsOptions: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) -case class ClusterManagedCapacityMonitor(idlePruneUseRatio: Double, - idleTimeout: FiniteDuration, - idleRemoveSize: ByteSize) - case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: Double, akkaClient: Boolean, clusterManagedResources: Boolean, - clusterManagedResourceMaxStarts: Int, - clusterManagedCapacityMonitor: ClusterManagedCapacityMonitor) { + useClusterBootstrap: Boolean, + clusterManagedResourceMaxStarts: Int) { require( concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor") diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index b5f431baa58..2f3f7c89eb1 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -43,11 +43,7 @@ whisk { akka-client: false # If true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient) cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerPool.clusterCanLaunch() is used cluster-managed-resource-max-starts: 20 # In cluster managed resource case, limit the number of concurrent container starts to this value. - cluster-managed-capacity-monitor { //config to cause accelerated pruning of idles once cluster capacity ratio is reached - idle-prune-use-ratio = 0.75 //begin freeing idles when we see less than this ratio of hosts (compared to max number of hosts ever seen) - idle-timeout = 10 seconds //only remove idles > 10s unused - idle-remove-size = 4096 M //remove 4GB at a time - } + useClusterBootstrap: false # if true, use akka boostrap to discover cluster nodes } kubernetes { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala index d6c92629cb7..442ff13bdea 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala @@ -38,6 +38,8 @@ import akka.cluster.ddata.Replicator.WriteLocal import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator.Put import akka.cluster.pubsub.DistributedPubSubMediator.Send +import akka.management.AkkaManagement +import akka.management.cluster.bootstrap.ClusterBootstrap import java.time.Instant import org.apache.openwhisk.common.Logging import org.apache.openwhisk.common.LoggingMarkers @@ -187,6 +189,12 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, .count({ case (_, state) => state.size.toMB > 0 }) < config.clusterManagedResourceMaxStarts //only positive reservations affect ability to start class ContainerPoolClusterData(instanceId: InvokerInstanceId, containerPool: ActorRef) extends Actor { + //it is possible to use cluster managed resources, but not run the invoker in the cluster + //when not using cluster boostrapping, you need to set akka seed node configs + if (poolConfig.useClusterBootstrap) { + AkkaManagement(context.system).start() + ClusterBootstrap(context.system).start() + } implicit val cluster = Cluster(system) implicit val ec = context.dispatcher val mediator = DistributedPubSub(system).mediator diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 718889aa75e..593ef17a392 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -158,11 +158,7 @@ class ContainerPool(instanceId: InvokerInstanceId, } else { r.action.limits.memory.megabytes.MB }) - .map { a => - //decrease the reserved (not yet stopped container) memory tracker - //resourceManager.addReservation(a._1, (-r.action.limits.memory.megabytes).MB) - removeContainer(a._1) - } + .map(a => removeContainer(a._1)) // If the list had at least one entry, enough containers were removed to start the new container. After // removing the containers, we are not interested anymore in the containers that have been removed. .headOption @@ -287,7 +283,7 @@ class ContainerPool(instanceId: InvokerInstanceId, // Container got removed case ContainerRemoved => - //stop tracking via reserved + //stop tracking via reserved (should already be removed, except in case of failure) resourceManager.releaseReservation(sender()) // if container was in free pool, it may have been processing (but under capacity), diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 6ddb7f55dda..3c4ec8df498 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -50,7 +50,6 @@ import scala.concurrent.duration._ import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.WhiskConfig import org.apache.openwhisk.core.WhiskConfig._ -import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor import org.apache.openwhisk.core.containerpool.ContainerArgsConfig import org.apache.openwhisk.core.containerpool.ContainerPoolConfig import org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStore @@ -83,7 +82,7 @@ class MesosContainerFactoryTest // 80 slots, each 265MB val poolConfig = - ContainerPoolConfig(21200.MB, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B)) + ContainerPoolConfig(21200.MB, 0.5, false, false, false, 10) val actionMemory = 265.MB val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0 diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 72ab92e2d72..c298bea77c6 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -42,7 +42,6 @@ import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.connector.MessageFeed -import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor /** * Behavior tests for the ContainerPool @@ -127,7 +126,7 @@ class ContainerPoolTests } def poolConfig(userMemory: ByteSize) = - ContainerPoolConfig(userMemory, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B)) + ContainerPoolConfig(userMemory, 0.5, false, false, false, 10) val instanceId = InvokerInstanceId(0, userMemory = 1024.MB) behavior of "ContainerPool" diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index 408132fb3c3..3b5f0816039 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -32,7 +32,6 @@ import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.connector.ActivationMessage -import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor import org.apache.openwhisk.core.containerpool._ import org.apache.openwhisk.core.containerpool.logging.LogCollectingException import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} @@ -202,7 +201,7 @@ class ContainerProxyTests Future.successful(()) } val poolConfig = - ContainerPoolConfig(2.MB, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B)) + ContainerPoolConfig(2.MB, 0.5, false, false, false, 10) behavior of "ContainerProxy"