Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Mar 28, 2019
1 parent e197f64 commit 1564ee8
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,19 @@ import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
import org.apache.openwhisk.spi.Spi
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
dnsSearch: Seq[String] = Seq.empty,
dnsOptions: Seq[String] = Seq.empty,
extraArgs: Map[String, Set[String]] = Map.empty)

case class ClusterManagedCapacityMonitor(idlePruneUseRatio: Double,
idleTimeout: FiniteDuration,
idleRemoveSize: ByteSize)

case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
clusterManagedResources: Boolean,
clusterManagedResourceMaxStarts: Int,
clusterManagedCapacityMonitor: ClusterManagedCapacityMonitor) {
useClusterBootstrap: Boolean,
clusterManagedResourceMaxStarts: Int) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
Expand Down
6 changes: 1 addition & 5 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ whisk {
akka-client: false # If true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerPool.clusterCanLaunch() is used
cluster-managed-resource-max-starts: 20 # In cluster managed resource case, limit the number of concurrent container starts to this value.
cluster-managed-capacity-monitor { //config to cause accelerated pruning of idles once cluster capacity ratio is reached
idle-prune-use-ratio = 0.75 //begin freeing idles when we see less than this ratio of hosts (compared to max number of hosts ever seen)
idle-timeout = 10 seconds //only remove idles > 10s unused
idle-remove-size = 4096 M //remove 4GB at a time
}
useClusterBootstrap: false # if true, use akka boostrap to discover cluster nodes
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import akka.cluster.ddata.Replicator.WriteLocal
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Put
import akka.cluster.pubsub.DistributedPubSubMediator.Send
import akka.management.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import java.time.Instant
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.LoggingMarkers
Expand Down Expand Up @@ -187,6 +189,12 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
.count({ case (_, state) => state.size.toMB > 0 }) < config.clusterManagedResourceMaxStarts //only positive reservations affect ability to start

class ContainerPoolClusterData(instanceId: InvokerInstanceId, containerPool: ActorRef) extends Actor {
//it is possible to use cluster managed resources, but not run the invoker in the cluster
//when not using cluster boostrapping, you need to set akka seed node configs
if (poolConfig.useClusterBootstrap) {
AkkaManagement(context.system).start()
ClusterBootstrap(context.system).start()
}
implicit val cluster = Cluster(system)
implicit val ec = context.dispatcher
val mediator = DistributedPubSub(system).mediator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ class ContainerPool(instanceId: InvokerInstanceId,
} else {
r.action.limits.memory.megabytes.MB
})
.map { a =>
//decrease the reserved (not yet stopped container) memory tracker
//resourceManager.addReservation(a._1, (-r.action.limits.memory.megabytes).MB)
removeContainer(a._1)
}
.map(a => removeContainer(a._1))
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
Expand Down Expand Up @@ -287,7 +283,7 @@ class ContainerPool(instanceId: InvokerInstanceId,

// Container got removed
case ContainerRemoved =>
//stop tracking via reserved
//stop tracking via reserved (should already be removed, except in case of failure)
resourceManager.releaseReservation(sender())

// if container was in free pool, it may have been processing (but under capacity),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import scala.concurrent.duration._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor
import org.apache.openwhisk.core.containerpool.ContainerArgsConfig
import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
import org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStore
Expand Down Expand Up @@ -83,7 +82,7 @@ class MesosContainerFactoryTest

// 80 slots, each 265MB
val poolConfig =
ContainerPoolConfig(21200.MB, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B))
ContainerPoolConfig(21200.MB, 0.5, false, false, false, 10)
val actionMemory = 265.MB
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor

/**
* Behavior tests for the ContainerPool
Expand Down Expand Up @@ -127,7 +126,7 @@ class ContainerPoolTests
}

def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B))
ContainerPoolConfig(userMemory, 0.5, false, false, false, 10)

val instanceId = InvokerInstanceId(0, userMemory = 1024.MB)
behavior of "ContainerPool"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool.ClusterManagedCapacityMonitor
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
Expand Down Expand Up @@ -202,7 +201,7 @@ class ContainerProxyTests
Future.successful(())
}
val poolConfig =
ContainerPoolConfig(2.MB, 0.5, false, false, 10, ClusterManagedCapacityMonitor(0.5, 10.seconds, 1024.B))
ContainerPoolConfig(2.MB, 0.5, false, false, false, 10)

behavior of "ContainerProxy"

Expand Down

0 comments on commit 1564ee8

Please sign in to comment.