Skip to content

Commit

Permalink
Merge pull request #59 from akka/wip-joining-2.4-patriknw
Browse files Browse the repository at this point in the history
backport joining to Akka 2.4
  • Loading branch information
patriknw authored Nov 30, 2017
2 parents b6f1094 + 79f7391 commit 8d811e0
Show file tree
Hide file tree
Showing 34 changed files with 1,900 additions and 220 deletions.
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@ lazy val `cluster-http` = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-management-cluster-http",
Dependencies.ClusterHttp
Dependencies.ClusterHttp,
resolvers += Resolver.bintrayRepo("hajile", "maven") // akka-dns
)

val unidocTask = sbtunidoc.Plugin.UnidocKeys.unidoc in (ProjectRef(file("."), "akka-management"), Compile)
lazy val `joining-demo` = project
.in(file("joining-demo"))
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-management-joining-demo",
skip in publish := true,
sources in (Compile, doc) := Seq.empty,
whitesourceIgnore := true
).dependsOn(`cluster-http`)

val unidocTask = sbtunidoc.Plugin.UnidocKeys.unidoc in(ProjectRef(file("."), "akka-management"), Compile)

lazy val docs = project
.in(file("docs"))
.enablePlugins(ParadoxPlugin, NoPublish)
Expand Down
88 changes: 88 additions & 0 deletions cluster-http/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,91 @@ akka.cluster.http.management {
# The value will need to be from 0 to 65535.
port = 19999
}

######################################################
# Akka Cluster Bootstrap Config #
######################################################


akka.cluster.bootstrap {


# Configuration for the first phase of bootstraping, during which contact points are discovered
# using the configured service discovery mechanism (e.g. DNS records).
contact-point-discovery {

# Define this name to be looked up in service discovery for "neighboring" nodes
# If undefined, the name will be extracted from the ActorSystem name
service-name = null

# Added as suffix to the service-name to build the effective-service name used in the contact-point service lookups
# If undefined, nothing will be appended to the service-name.
#
# Examples, set this to:
# "default.svc.cluster.local" or "my-namespace.svc.cluster.local" for kubernetes clusters.
service-namespace = null

# The effective service name is the exact string that will be used to perform service discovery
# usually, this means the service-name being suffixed with an additional namespace (e.g. "name.default"
effective-service-name = null

# Config path of discovery method to be used to locate the initial contact points.
# It must be a fully qualified config path to the discovery's config section.
discovery-method = akka.discovery.akka-dns

# Amount of time for which a discovery observation must remain "stable"
# (i.e. not change list of discovered contact-points) before a join decision can be made.
# This is done to decrease the likelyhood of performing decisions on fluctuating observations.
stable-margin = 3 seconds

# Interval at which service discovery will be polled in search for new contact-points
interval = 1 second

# The smallest number of contact points that need to be discovered before the bootstrap process can start.
# For optimal safety during cluster formation, you may want to set these value to the number of initial
# nodes that you know will participate in the cluster (e.g. the value of `spec.replicas` as set in your kubernetes config.
required-contact-point-nr = 2

# Timeout for getting a reply from the service-discovery subsystem
resolve-timeout = 3 seconds

}

# Configured how we communicate with the contact point once it is discovered
contact-point {

# If no port is discovered along with the host/ip of a contact point this port will be used as fallback
fallback-port = 8558 # port pun, it "complements" 2552 which is often used for Akka remoting

# If no seed nodes are discovered from a given contact-point by this time
# it will assume that the the observation is "stable" (i.e. will not change),
# and a new cluster may need to be formed. In response to this the main bootstrap
# coordinating process may decide to join itself, or keep waiting for discovering of a seed node.
no-seeds-stable-margin = 5 seconds

# Interval at which contact points should be polled
# the effective interval used is this value plus the same value multiplied by the jitter value
probe-interval = 1 second

# Max amount of jitter to be added on retries
probe-interval-jitter = 0.2

# Probe will be failed if it doesn't return in this amount of time
probe-timeout = 1 second
}
}

######################################################
# Akka Service Discovery Bootstrap Config #
######################################################

akka.discovery {

akka-dns {
class = akka.discovery.AkkaDnsServiceDiscovery

resolve-interval = 1 second

resolve-timeout = 2 second
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.bootstrap

import java.util.concurrent.atomic.AtomicReference

import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.cluster.bootstrap.dns.HeadlessServiceDnsBootstrap
import akka.discovery.ServiceDiscovery
import akka.event.Logging
import akka.http.scaladsl.model.Uri
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout

import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }

final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Extension {

import ClusterBootstrap._
import system.dispatcher

private implicit val mat = ActorMaterializer()(system)

private val log = Logging(system, classOf[ClusterBootstrap])

private final val bootstrapStep = new AtomicReference[BootstrapStep](NotRunning)

val settings = ClusterBootstrapSettings(system.settings.config)

// used for initial discovery of contact points
val discovery: ServiceDiscovery = {
val clazz = settings.contactPointDiscovery.discoveryClass
system.dynamicAccess.createInstanceFor[ServiceDiscovery](clazz, List(classOf[ActorSystem] system)).get
}

private[this] val _selfContactPointUri: Promise[Uri] = Promise()

def start(): Unit =
if (bootstrapStep.compareAndSet(NotRunning, Initializing)) {
log.info("Initiating bootstrap procedure using {} method...", settings.contactPointDiscovery.discoveryMethod)

// TODO this could be configured as well, depending on how we want to bootstrap
val bootstrapProps = HeadlessServiceDnsBootstrap.props(discovery, settings)
val bootstrap = system.systemActorOf(bootstrapProps, "headlessServiceDnsBootstrap")

// the boot timeout not really meant to be exceeded
implicit val bootTimeout: Timeout = Timeout(1.day)
val bootstrapCompleted = (bootstrap ? HeadlessServiceDnsBootstrap.Protocol.InitiateBootstraping).mapTo[
HeadlessServiceDnsBootstrap.Protocol.BootstrapingCompleted]

bootstrapCompleted.onComplete {
case Success(_) // ignore, all's fine
case Failure(_) log.warning("Failed to complete bootstrap within {}!", bootTimeout)
}
} else log.warning("Bootstrap already initiated, yet start() method was called again. Ignoring.")

/**
* INTERNAL API
*
* Must be invoked by whoever starts the HTTP server with the `HttpClusterBootstrapRoutes`.
* This allows us to "reverse lookup" from a lowest-address sorted contact point list,
* that we discover via discovery, if a given contact point corresponds to our remoting address,
* and if so, we may opt to join ourselves using the address.
*
* @return true if successfully set, false otherwise (i.e. was set already)
*/
@InternalApi
def setSelfContactPoint(baseUri: Uri): Boolean =
_selfContactPointUri.trySuccess(baseUri)

/** INTERNAL API */
private[akka] def selfContactPoint: Future[Uri] =
_selfContactPointUri.future

}

object ClusterBootstrap extends ExtensionId[ClusterBootstrap] with ExtensionIdProvider {
override def lookup: ClusterBootstrap.type = ClusterBootstrap

override def get(system: ActorSystem): ClusterBootstrap = super.get(system)

override def createExtension(system: ExtendedActorSystem): ClusterBootstrap = new ClusterBootstrap()(system)

private[bootstrap] sealed trait BootstrapStep
private[bootstrap] case object NotRunning extends BootstrapStep
private[bootstrap] case object Initializing extends BootstrapStep
// TODO get the Initialized state once done
private[bootstrap] case object Initialized extends BootstrapStep

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.bootstrap

import java.util.Locale
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import com.typesafe.config.Config

import scala.concurrent.duration.{ FiniteDuration, _ }

final class ClusterBootstrapSettings(config: Config) {
private val bootConfig = config.getConfig("akka.cluster.bootstrap")

object contactPointDiscovery {
private val discoveryConfig: Config = bootConfig.getConfig("contact-point-discovery")

val serviceName: Option[String] =
if (discoveryConfig.hasPath("service-name")) Some(discoveryConfig.getString("service-name")) else None

val serviceNamespace: Option[String] =
if (discoveryConfig.hasPath("service-namespace")) Some(discoveryConfig.getString("service-namespace")) else None

def effectiveName(system: ActorSystem): String = {
val service = serviceName match {
case Some(name) name
case _ system.name.toLowerCase(Locale.ROOT).replaceAll(" ", "-").replace("_", "-")
}
val namespace = serviceNamespace match {
case Some(ns) s".$ns"
case _ ""
}
if (discoveryConfig.hasPath("effective-name")) discoveryConfig.getString("effective-name")
else service + namespace
}

val discoveryMethod: String = discoveryConfig.getString("discovery-method")

private val effectiveDiscoveryConfig: Config = discoveryConfig.withFallback(config.getConfig(discoveryMethod))
val discoveryClass: String = effectiveDiscoveryConfig.getString("class")

val stableMargin: FiniteDuration =
effectiveDiscoveryConfig.getDuration("stable-margin", TimeUnit.MILLISECONDS).millis

val interval: FiniteDuration =
effectiveDiscoveryConfig.getDuration("interval", TimeUnit.MILLISECONDS).millis

val requiredContactPointsNr: Int = discoveryConfig.getInt("required-contact-point-nr")
require(requiredContactPointsNr >= 2,
"Number of contact points must be greater than 1. " +
"For 'single node clusters' simply avoid using the seed bootstraping process, and issue a self-join manually.")

val resolveTimeout: FiniteDuration = discoveryConfig.getDuration("resolve-timeout", TimeUnit.MILLISECONDS).millis

}

object contactPoint {
private val contactPointConfig = bootConfig.getConfig("contact-point")

// FIXME this has to be the same as the management one, we currently override this value when starting management, any better way?
val fallbackPort = contactPointConfig.getInt("fallback-port")

val noSeedsStableMargin: FiniteDuration =
contactPointConfig.getDuration("no-seeds-stable-margin", TimeUnit.MILLISECONDS).millis

val probeInterval: FiniteDuration =
contactPointConfig.getDuration("probe-interval", TimeUnit.MILLISECONDS).millis

val probeIntervalJitter: Double =
contactPointConfig.getDouble("probe-interval-jitter")

val probeTimeout: FiniteDuration =
contactPointConfig.getDuration("probe-timeout", TimeUnit.MILLISECONDS).millis

val httpMaxSeedNodesToExpose: Int = 5
}

}

object ClusterBootstrapSettings {
def apply(config: Config): ClusterBootstrapSettings =
new ClusterBootstrapSettings(config)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.bootstrap.contactpoint

import akka.actor.{ Address, AddressFromURIString }
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.{ DefaultJsonProtocol, JsString, JsValue, RootJsonFormat }

trait HttpBootstrapJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
import HttpBootstrapJsonProtocol._

implicit object AddressFormat extends RootJsonFormat[Address] {
override def read(json: JsValue): Address = json match {
case JsString(s) AddressFromURIString.parse(s)
case invalid throw new IllegalArgumentException(s"Illegal address value! Was [$invalid]")
}

override def write(obj: Address): JsValue = JsString(obj.toString)
}
implicit val SeedNodeFormat = jsonFormat1(SeedNode)
implicit val ClusterMemberFormat = jsonFormat4(ClusterMember)
implicit val ClusterMembersFormat = jsonFormat2(SeedNodes)
}

object HttpBootstrapJsonProtocol extends DefaultJsonProtocol {

final case class SeedNode(address: Address)

// we use Address since we want to know which protocol is being used (tcp, artery, artery-tcp etc)
final case class ClusterMember(node: Address, nodeUid: Long, status: String, roles: Set[String])

final case class SeedNodes(selfNode: Address, seedNodes: Set[ClusterMember])

}
Loading

0 comments on commit 8d811e0

Please sign in to comment.