diff --git a/build.sbt b/build.sbt index e6d027ea3..1b0665d53 100644 --- a/build.sbt +++ b/build.sbt @@ -12,10 +12,22 @@ lazy val `cluster-http` = project .enablePlugins(AutomateHeaderPlugin) .settings( name := "akka-management-cluster-http", - Dependencies.ClusterHttp + Dependencies.ClusterHttp, + resolvers += Resolver.bintrayRepo("hajile", "maven") // akka-dns ) -val unidocTask = sbtunidoc.Plugin.UnidocKeys.unidoc in (ProjectRef(file("."), "akka-management"), Compile) +lazy val `joining-demo` = project + .in(file("joining-demo")) + .enablePlugins(AutomateHeaderPlugin) + .settings( + name := "akka-management-joining-demo", + skip in publish := true, + sources in (Compile, doc) := Seq.empty, + whitesourceIgnore := true + ).dependsOn(`cluster-http`) + +val unidocTask = sbtunidoc.Plugin.UnidocKeys.unidoc in(ProjectRef(file("."), "akka-management"), Compile) + lazy val docs = project .in(file("docs")) .enablePlugins(ParadoxPlugin, NoPublish) diff --git a/cluster-http/src/main/resources/reference.conf b/cluster-http/src/main/resources/reference.conf index c21958dea..a14963f59 100644 --- a/cluster-http/src/main/resources/reference.conf +++ b/cluster-http/src/main/resources/reference.conf @@ -16,3 +16,91 @@ akka.cluster.http.management { # The value will need to be from 0 to 65535. port = 19999 } + +###################################################### +# Akka Cluster Bootstrap Config # +###################################################### + + +akka.cluster.bootstrap { + + + # Configuration for the first phase of bootstraping, during which contact points are discovered + # using the configured service discovery mechanism (e.g. DNS records). + contact-point-discovery { + + # Define this name to be looked up in service discovery for "neighboring" nodes + # If undefined, the name will be extracted from the ActorSystem name + service-name = null + + # Added as suffix to the service-name to build the effective-service name used in the contact-point service lookups + # If undefined, nothing will be appended to the service-name. + # + # Examples, set this to: + # "default.svc.cluster.local" or "my-namespace.svc.cluster.local" for kubernetes clusters. + service-namespace = null + + # The effective service name is the exact string that will be used to perform service discovery + # usually, this means the service-name being suffixed with an additional namespace (e.g. "name.default" + effective-service-name = null + + # Config path of discovery method to be used to locate the initial contact points. + # It must be a fully qualified config path to the discovery's config section. + discovery-method = akka.discovery.akka-dns + + # Amount of time for which a discovery observation must remain "stable" + # (i.e. not change list of discovered contact-points) before a join decision can be made. + # This is done to decrease the likelyhood of performing decisions on fluctuating observations. + stable-margin = 3 seconds + + # Interval at which service discovery will be polled in search for new contact-points + interval = 1 second + + # The smallest number of contact points that need to be discovered before the bootstrap process can start. + # For optimal safety during cluster formation, you may want to set these value to the number of initial + # nodes that you know will participate in the cluster (e.g. the value of `spec.replicas` as set in your kubernetes config. + required-contact-point-nr = 2 + + # Timeout for getting a reply from the service-discovery subsystem + resolve-timeout = 3 seconds + + } + + # Configured how we communicate with the contact point once it is discovered + contact-point { + + # If no port is discovered along with the host/ip of a contact point this port will be used as fallback + fallback-port = 8558 # port pun, it "complements" 2552 which is often used for Akka remoting + + # If no seed nodes are discovered from a given contact-point by this time + # it will assume that the the observation is "stable" (i.e. will not change), + # and a new cluster may need to be formed. In response to this the main bootstrap + # coordinating process may decide to join itself, or keep waiting for discovering of a seed node. + no-seeds-stable-margin = 5 seconds + + # Interval at which contact points should be polled + # the effective interval used is this value plus the same value multiplied by the jitter value + probe-interval = 1 second + + # Max amount of jitter to be added on retries + probe-interval-jitter = 0.2 + + # Probe will be failed if it doesn't return in this amount of time + probe-timeout = 1 second + } +} + +###################################################### +# Akka Service Discovery Bootstrap Config # +###################################################### + +akka.discovery { + + akka-dns { + class = akka.discovery.AkkaDnsServiceDiscovery + + resolve-interval = 1 second + + resolve-timeout = 2 second + } +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrap.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrap.scala new file mode 100644 index 000000000..297dbdd3a --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrap.scala @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.annotation.InternalApi +import akka.cluster.bootstrap.dns.HeadlessServiceDnsBootstrap +import akka.discovery.ServiceDiscovery +import akka.event.Logging +import akka.http.scaladsl.model.Uri +import akka.pattern.ask +import akka.stream.ActorMaterializer +import akka.util.Timeout + +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success } + +final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Extension { + + import ClusterBootstrap._ + import system.dispatcher + + private implicit val mat = ActorMaterializer()(system) + + private val log = Logging(system, classOf[ClusterBootstrap]) + + private final val bootstrapStep = new AtomicReference[BootstrapStep](NotRunning) + + val settings = ClusterBootstrapSettings(system.settings.config) + + // used for initial discovery of contact points + val discovery: ServiceDiscovery = { + val clazz = settings.contactPointDiscovery.discoveryClass + system.dynamicAccess.createInstanceFor[ServiceDiscovery](clazz, List(classOf[ActorSystem] → system)).get + } + + private[this] val _selfContactPointUri: Promise[Uri] = Promise() + + def start(): Unit = + if (bootstrapStep.compareAndSet(NotRunning, Initializing)) { + log.info("Initiating bootstrap procedure using {} method...", settings.contactPointDiscovery.discoveryMethod) + + // TODO this could be configured as well, depending on how we want to bootstrap + val bootstrapProps = HeadlessServiceDnsBootstrap.props(discovery, settings) + val bootstrap = system.systemActorOf(bootstrapProps, "headlessServiceDnsBootstrap") + + // the boot timeout not really meant to be exceeded + implicit val bootTimeout: Timeout = Timeout(1.day) + val bootstrapCompleted = (bootstrap ? HeadlessServiceDnsBootstrap.Protocol.InitiateBootstraping).mapTo[ + HeadlessServiceDnsBootstrap.Protocol.BootstrapingCompleted] + + bootstrapCompleted.onComplete { + case Success(_) ⇒ // ignore, all's fine + case Failure(_) ⇒ log.warning("Failed to complete bootstrap within {}!", bootTimeout) + } + } else log.warning("Bootstrap already initiated, yet start() method was called again. Ignoring.") + + /** + * INTERNAL API + * + * Must be invoked by whoever starts the HTTP server with the `HttpClusterBootstrapRoutes`. + * This allows us to "reverse lookup" from a lowest-address sorted contact point list, + * that we discover via discovery, if a given contact point corresponds to our remoting address, + * and if so, we may opt to join ourselves using the address. + * + * @return true if successfully set, false otherwise (i.e. was set already) + */ + @InternalApi + def setSelfContactPoint(baseUri: Uri): Boolean = + _selfContactPointUri.trySuccess(baseUri) + + /** INTERNAL API */ + private[akka] def selfContactPoint: Future[Uri] = + _selfContactPointUri.future + +} + +object ClusterBootstrap extends ExtensionId[ClusterBootstrap] with ExtensionIdProvider { + override def lookup: ClusterBootstrap.type = ClusterBootstrap + + override def get(system: ActorSystem): ClusterBootstrap = super.get(system) + + override def createExtension(system: ExtendedActorSystem): ClusterBootstrap = new ClusterBootstrap()(system) + + private[bootstrap] sealed trait BootstrapStep + private[bootstrap] case object NotRunning extends BootstrapStep + private[bootstrap] case object Initializing extends BootstrapStep + // TODO get the Initialized state once done + private[bootstrap] case object Initialized extends BootstrapStep + +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrapSettings.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrapSettings.scala new file mode 100644 index 000000000..f90a22069 --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap + +import java.util.Locale +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import com.typesafe.config.Config + +import scala.concurrent.duration.{ FiniteDuration, _ } + +final class ClusterBootstrapSettings(config: Config) { + private val bootConfig = config.getConfig("akka.cluster.bootstrap") + + object contactPointDiscovery { + private val discoveryConfig: Config = bootConfig.getConfig("contact-point-discovery") + + val serviceName: Option[String] = + if (discoveryConfig.hasPath("service-name")) Some(discoveryConfig.getString("service-name")) else None + + val serviceNamespace: Option[String] = + if (discoveryConfig.hasPath("service-namespace")) Some(discoveryConfig.getString("service-namespace")) else None + + def effectiveName(system: ActorSystem): String = { + val service = serviceName match { + case Some(name) ⇒ name + case _ ⇒ system.name.toLowerCase(Locale.ROOT).replaceAll(" ", "-").replace("_", "-") + } + val namespace = serviceNamespace match { + case Some(ns) ⇒ s".$ns" + case _ ⇒ "" + } + if (discoveryConfig.hasPath("effective-name")) discoveryConfig.getString("effective-name") + else service + namespace + } + + val discoveryMethod: String = discoveryConfig.getString("discovery-method") + + private val effectiveDiscoveryConfig: Config = discoveryConfig.withFallback(config.getConfig(discoveryMethod)) + val discoveryClass: String = effectiveDiscoveryConfig.getString("class") + + val stableMargin: FiniteDuration = + effectiveDiscoveryConfig.getDuration("stable-margin", TimeUnit.MILLISECONDS).millis + + val interval: FiniteDuration = + effectiveDiscoveryConfig.getDuration("interval", TimeUnit.MILLISECONDS).millis + + val requiredContactPointsNr: Int = discoveryConfig.getInt("required-contact-point-nr") + require(requiredContactPointsNr >= 2, + "Number of contact points must be greater than 1. " + + "For 'single node clusters' simply avoid using the seed bootstraping process, and issue a self-join manually.") + + val resolveTimeout: FiniteDuration = discoveryConfig.getDuration("resolve-timeout", TimeUnit.MILLISECONDS).millis + + } + + object contactPoint { + private val contactPointConfig = bootConfig.getConfig("contact-point") + + // FIXME this has to be the same as the management one, we currently override this value when starting management, any better way? + val fallbackPort = contactPointConfig.getInt("fallback-port") + + val noSeedsStableMargin: FiniteDuration = + contactPointConfig.getDuration("no-seeds-stable-margin", TimeUnit.MILLISECONDS).millis + + val probeInterval: FiniteDuration = + contactPointConfig.getDuration("probe-interval", TimeUnit.MILLISECONDS).millis + + val probeIntervalJitter: Double = + contactPointConfig.getDouble("probe-interval-jitter") + + val probeTimeout: FiniteDuration = + contactPointConfig.getDuration("probe-timeout", TimeUnit.MILLISECONDS).millis + + val httpMaxSeedNodesToExpose: Int = 5 + } + +} + +object ClusterBootstrapSettings { + def apply(config: Config): ClusterBootstrapSettings = + new ClusterBootstrapSettings(config) +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpBootstrapJsonProtocol.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpBootstrapJsonProtocol.scala new file mode 100644 index 000000000..71897d914 --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpBootstrapJsonProtocol.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.contactpoint + +import akka.actor.{ Address, AddressFromURIString } +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.{ DefaultJsonProtocol, JsString, JsValue, RootJsonFormat } + +trait HttpBootstrapJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol { + import HttpBootstrapJsonProtocol._ + + implicit object AddressFormat extends RootJsonFormat[Address] { + override def read(json: JsValue): Address = json match { + case JsString(s) ⇒ AddressFromURIString.parse(s) + case invalid ⇒ throw new IllegalArgumentException(s"Illegal address value! Was [$invalid]") + } + + override def write(obj: Address): JsValue = JsString(obj.toString) + } + implicit val SeedNodeFormat = jsonFormat1(SeedNode) + implicit val ClusterMemberFormat = jsonFormat4(ClusterMember) + implicit val ClusterMembersFormat = jsonFormat2(SeedNodes) +} + +object HttpBootstrapJsonProtocol extends DefaultJsonProtocol { + + final case class SeedNode(address: Address) + + // we use Address since we want to know which protocol is being used (tcp, artery, artery-tcp etc) + final case class ClusterMember(node: Address, nodeUid: Long, status: String, roles: Set[String]) + + final case class SeedNodes(selfNode: Address, seedNodes: Set[ClusterMember]) + +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala new file mode 100644 index 000000000..6986169a4 --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/contactpoint/HttpClusterBootstrapRoutes.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.contactpoint + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ActorSystem, Address } +import akka.annotation.InternalApi +import akka.cluster.{ Cluster, Member, MemberStatus, UniqueAddress } +import akka.cluster.bootstrap.ClusterBootstrapSettings +import akka.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.{ ClusterMember, SeedNodes } +import akka.event.{ Logging, LoggingAdapter } +import akka.http.scaladsl.model.{ HttpRequest, Uri } +import akka.http.scaladsl.server.Route + +import scala.concurrent.duration._ + +final class HttpClusterBootstrapRoutes(settings: ClusterBootstrapSettings) extends HttpBootstrapJsonProtocol { + + import akka.http.scaladsl.server.Directives._ + + private def routeGetSeedNodes: Route = extractClientIP { clientIp ⇒ + extractActorSystem { implicit system ⇒ + val cluster = Cluster(system) + + def memberToClusterMember(m: Member): ClusterMember = + ClusterMember(m.uniqueAddress.address, m.uniqueAddress.longUid, m.status.toString, m.roles) + + val state = cluster.state + + // TODO shuffle the members so in a big deployment nodes start joining different ones and not all the same? + val members = state.members.take(settings.contactPoint.httpMaxSeedNodesToExpose).map(memberToClusterMember) + + val info = SeedNodes(cluster.selfAddress, members) + log.info("Bootstrap request from {}: Contact Point returning {} seed-nodes ([{}])", clientIp, members.size, + members) + complete(info) + } + } + + // TODO ip whitelist feature? + val routes = { + // TODO basePath, same as akka-management + // val basePath = if (pathPrefixName.isEmpty) rawPathPrefix(pathPrefixName) else pathPrefix(pathPrefixName) + + toStrictEntity(1.second) { // always drain everything + concat( + (get & path("bootstrap" / "seed-nodes"))(routeGetSeedNodes) + ) + } + } + + private def log(implicit sys: ActorSystem): LoggingAdapter = + Logging(sys, classOf[HttpClusterBootstrapRoutes]) + +} + +object ClusterBootstrapRequests { + + import akka.http.scaladsl.client.RequestBuilding._ + + def bootstrapSeedNodes(baseUri: Uri): HttpRequest = + Get(baseUri + "/bootstrap/seed-nodes") + +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HeadlessServiceDnsBootstrap.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HeadlessServiceDnsBootstrap.scala new file mode 100644 index 000000000..3edceb06c --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HeadlessServiceDnsBootstrap.scala @@ -0,0 +1,301 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.dns + +import java.util.{ Date, Locale } + +import akka.actor.Status.Failure +import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Address, DeadLetterSuppression, Props } +import akka.annotation.InternalApi +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.bootstrap.{ ClusterBootstrap, ClusterBootstrapSettings } +import akka.http.scaladsl.model.Uri +import akka.pattern.{ ask, pipe } +import akka.discovery.ServiceDiscovery +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.util.PrettyDuration +import scala.collection.immutable +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +import akka.actor.Cancellable + +/** INTERNAL API */ +@InternalApi +private[bootstrap] object HeadlessServiceDnsBootstrap { + + def props(discovery: ServiceDiscovery, settings: ClusterBootstrapSettings): Props = + Props(new HeadlessServiceDnsBootstrap(discovery, settings)) + + object Protocol { + final case object InitiateBootstraping + sealed trait BootstrapingCompleted + final case object BootstrapingCompleted extends BootstrapingCompleted + + final case class ObtainedHttpSeedNodesObservation( + seedNodesSourceAddress: Address, + observedSeedNodes: Set[Address] + ) extends DeadLetterSuppression + + final case class NoSeedNodesObtainedWithinDeadline(contactPoint: Uri) extends DeadLetterSuppression + + object Internal { + final case class AttemptResolve(serviceName: String) extends DeadLetterSuppression + case object ScheduledAttemptResolve + } + } + + protected[dns] final case class DnsServiceContactsObservation( + observedAt: Long, + observedContactPoints: List[ResolvedTarget] + ) { + + /** Prepares member addresses for a self-join attempt */ + def selfAddressIfAbleToJoinItself(system: ActorSystem): Option[Address] = { + val cluster = Cluster(system) + val bootstrap = ClusterBootstrap(system) + + // we KNOW this await is safe, since we set the value before we bind the HTTP things even + val selfContactPoint = Await.result(bootstrap.selfContactPoint, 30.seconds) + + // we check if a contact point is "us", by comparing host and port that we've bound to + def lowestContactPointIsSelfManagement(lowest: ResolvedTarget): Boolean = + lowest.host == selfContactPoint.authority.host.toString() && + lowest.port.getOrElse(selfContactPoint.authority.port) == selfContactPoint.authority.port + + lowestAddressContactPoint + .find(lowestContactPointIsSelfManagement) // if the lowest contact-point address is "us" + .map(_ ⇒ cluster.selfAddress) // then we should join our own remoting address + } + + /** + * Contact point with the "lowest" contact point address, + * it is expected to join itself if no other cluster is found in the deployment. + */ + def lowestAddressContactPoint: Option[ResolvedTarget] = + observedContactPoints.sortBy(e ⇒ e.host + ":" + e.port.getOrElse(0)).headOption + + def willBeStableAt(settings: ClusterBootstrapSettings): Long = + observedAt + settings.contactPointDiscovery.stableMargin.toMillis + + def isPastStableMargin(settings: ClusterBootstrapSettings, timeNow: Long): Boolean = + willBeStableAt(settings) < timeNow + + def durationSinceObservation(timeNowMillis: Long): Duration = { + val millisSince = timeNowMillis - observedAt + math.max(0, millisSince).millis + } + + def membersChanged(other: DnsServiceContactsObservation): Boolean = { + val these = this.observedContactPoints.toSet + val others = other.observedContactPoints.toSet + others != these + } + + def sameOrChanged(other: DnsServiceContactsObservation): DnsServiceContactsObservation = + if (membersChanged(other)) other + else this + } + +} + +/** + * Looks up members of the same "service" in DNS and initiates [[HttpContactPointBootstrap]]'s for each such node. + * If any of the contact-points returns a list of seed nodes it joins them immediately. + * + * If contact points do not return any seed-nodes for a `contactPointNoSeedsStableMargin` amount of time, + * we decide that apparently there is no cluster formed yet in this deployment and someone as to become the first node + * to join itself (becoming the first node of the cluster, that all other nodes will join). + * + * The decision of joining "self" is made by deterministically sorting the discovered service IPs + * and picking the *lowest* address. + * + * If this node is the one with the lowest address in the deployment, it will join itself and other nodes will notice + * this via the contact-point probing mechanism and join this node. Please note while the cluster is "growing" + * more nodes become aware of the cluster and start returning the seed-nodes in their contact-points, thus the joining + * process becomes somewhat "epidemic". Other nodes may get to know about this cluster by contacting any other node + * that has joined it already, and they may join any seed-node that they retrieve using this method, as effectively + * this will mean it joins the "right" cluster. + * + * CAVEATS: + * There is a slight timing issue, that may theoretically appear in this bootstrap process. + * FIXME explain the races + */ +// also known as the "Baron von Bootstrappen" +@InternalApi +final class HeadlessServiceDnsBootstrap(discovery: ServiceDiscovery, settings: ClusterBootstrapSettings) + extends Actor + with ActorLogging { + + import HeadlessServiceDnsBootstrap.Protocol._ + import HeadlessServiceDnsBootstrap._ + import context.dispatcher + + private val cluster = Cluster(context.system) + + private var lastContactsObservation: DnsServiceContactsObservation = + DnsServiceContactsObservation(Long.MaxValue, Nil) + + private var timerTask: Option[Cancellable] = None + + override def postStop(): Unit = { + timerTask.foreach(_.cancel()) + super.postStop() + } + + /** Awaiting initial signal to start the bootstrap process */ + override def receive: Receive = { + case InitiateBootstraping ⇒ + val serviceName = settings.contactPointDiscovery.effectiveName(context.system) + + log.info("Locating service members, via DNS lookup: {}", serviceName) + discovery.lookup(serviceName, settings.contactPointDiscovery.resolveTimeout).pipeTo(self) + + context become bootstraping(serviceName, sender()) + } + + /** In process of searching for seed-nodes */ + def bootstraping(serviceName: String, replyTo: ActorRef): Receive = { + case Internal.AttemptResolve(name) ⇒ + attemptResolve(name) + + case Internal.ScheduledAttemptResolve ⇒ + if (timerTask.isDefined) { + attemptResolve(serviceName) + timerTask = None + } + + case ServiceDiscovery.Resolved(name, contactPoints) ⇒ + onContactPointsResolved(name, contactPoints) + + case ex: Failure ⇒ + log.warning("Resolve attempt failed! Cause: {}", ex.cause) + scheduleNextResolve(serviceName, settings.contactPointDiscovery.interval) + + case ObtainedHttpSeedNodesObservation(infoFromAddress, observedSeedNodes) ⇒ + log.info("Contact point [{}] returned [{}] seed-nodes [{}], initiating cluster joining...", infoFromAddress, + observedSeedNodes.size, observedSeedNodes.mkString(", ")) + + replyTo ! BootstrapingCompleted + + val seedNodesList = observedSeedNodes.toList + cluster.joinSeedNodes(seedNodesList) + + // once we issued a join bootstraping is completed + context.stop(self) + + case NoSeedNodesObtainedWithinDeadline(contactPoint) ⇒ + log.info( + "Contact point [{}] exceeded stable margin with no seed-nodes in sight. " + + "Considering weather this node is allowed to JOIN itself to initiate a new cluster.", contactPoint) + + onNoSeedNodesObtainedWithinStableDeadline(contactPoint) + } + + private def attemptResolve(name: String): Unit = + discovery.lookup(name, settings.contactPointDiscovery.resolveTimeout).pipeTo(self) + + private def onContactPointsResolved(serviceName: String, contactPoints: immutable.Seq[ResolvedTarget]): Unit = { + val newObservation = DnsServiceContactsObservation(timeNow(), contactPoints.toList) + lastContactsObservation = lastContactsObservation.sameOrChanged(newObservation) + + if (contactPoints.size < settings.contactPointDiscovery.requiredContactPointsNr) + onInsufficientContactPointsDiscovered(serviceName, lastContactsObservation) + else + onSufficientContactPointsDiscovered(serviceName, lastContactsObservation) + } + + private def onInsufficientContactPointsDiscovered(serviceName: String, + observation: DnsServiceContactsObservation): Unit = { + log.info("Discovered [{}] observation, which is less than the required [{}], retrying (interval: {})", + observation.observedContactPoints.size, settings.contactPointDiscovery.requiredContactPointsNr, + PrettyDuration.format(settings.contactPointDiscovery.interval)) + + scheduleNextResolve(serviceName, settings.contactPointDiscovery.interval) + } + + private def onSufficientContactPointsDiscovered(serviceName: String, + observation: DnsServiceContactsObservation): Unit = { + log.info("Initiating contact-point probing, sufficient contact points: {}", + observation.observedContactPoints.mkString(", ")) + + observation.observedContactPoints.foreach { contactPoint ⇒ + val targetPort = contactPoint.port.getOrElse(settings.contactPoint.fallbackPort) + val baseUri = Uri("http", Uri.Authority(Uri.Host(contactPoint.host), targetPort)) + ensureProbing(baseUri) + } + } + + private def onNoSeedNodesObtainedWithinStableDeadline(contactPoint: Uri): Unit = { + val dnsRecordsAreStable = lastContactsObservation.isPastStableMargin(settings, timeNow()) + if (dnsRecordsAreStable) { + lastContactsObservation.selfAddressIfAbleToJoinItself(context.system) match { + case Some(allowedToJoinSelfAddress) ⇒ + log.info( + "Initiating new cluster, self-joining [{}], as this node has the LOWEST address out of: [{}]! " + + "Other nodes are expected to locate this cluster via continued contact-point probing.", + cluster.selfAddress, lastContactsObservation.observedContactPoints) + + cluster.join(allowedToJoinSelfAddress) + + context.stop(self) // the bootstraping is complete + case None ⇒ + log.info( + "Exceeded stable margins without locating seed-nodes, however this node is NOT the lowest address out " + + "of the discovered IPs in this deployment, thus NOT joining self. Expecting node {} (out of {}) to perform the self-join " + + "and initiate the cluster.", lastContactsObservation.lowestAddressContactPoint, + lastContactsObservation.observedContactPoints) + + // nothing to do anymore, the probing will continue until the lowest addressed node decides to join itself. + // note, that due to DNS changes this may still become this node! We'll then await until the dns stableMargin + // is exceeded and would decide to try joining self again (same code-path), that time successfully though. + } + + } else { + // TODO throttle this logging? It may be caused by any of the probing actors + log.debug( + "DNS observation has changed more recently than the dnsStableMargin({}) allows (at: {}), not considering to join myself. " + + "This process will be retried.", settings.contactPointDiscovery.stableMargin, + new Date(lastContactsObservation.observedAt)) + } + } + + private def ensureProbing(baseUri: Uri): Option[ActorRef] = { + val childActorName = s"contactPointProbe-${baseUri.authority.host}-${baseUri.authority.port}" + log.info("Ensuring probing actor: " + childActorName) + + // This should never really happen in well configured env, but it may happen that someone is confused with ports + // and we end up trying to probe (using http for example) a port that actually is our own remoting port. + // We actively bail out of this case and log a warning instead. + val wasAboutToProbeSelfAddress = + baseUri.authority.host.address() == cluster.selfAddress.host.getOrElse("---") && + baseUri.authority.port == cluster.selfAddress.port.getOrElse(-1) + + if (wasAboutToProbeSelfAddress) { + log.warning("Misconfiguration detected! Attempted to start probing a contact-point which address [{}] " + + "matches our local remoting address [{}]. Avoiding probing this address. Consider double checking your service " + + "discovery and port configurations.", baseUri, cluster.selfAddress) + None + } else + context.child(childActorName) match { + case Some(contactPointProbingChild) ⇒ + Some(contactPointProbingChild) + case None ⇒ + val props = HttpContactPointBootstrap.props(settings, self, baseUri) + Some(context.actorOf(props, childActorName)) + } + } + + private def scheduleNextResolve(serviceName: String, interval: FiniteDuration): Unit = { + // this re-scheduling of timer tasks might not be completely safe, e.g. in case of restarts, but should + // be good enough for the Akka 2.4 release. In the Akka 2.5 release we are using Timers. + timerTask.foreach(_.cancel()) + timerTask = Some(context.system.scheduler.scheduleOnce(interval, self, Internal.ScheduledAttemptResolve)) + } + + protected def timeNow(): Long = + System.currentTimeMillis() + +} diff --git a/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HttpContactPointBootstrap.scala b/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HttpContactPointBootstrap.scala new file mode 100644 index 000000000..c9c152c8b --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/bootstrap/dns/HttpContactPointBootstrap.scala @@ -0,0 +1,181 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.dns + +import java.util.concurrent.ThreadLocalRandom + +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Props } +import akka.annotation.InternalApi +import akka.cluster.Cluster +import akka.cluster.bootstrap.ClusterBootstrapSettings +import akka.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol.SeedNodes +import akka.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol } +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.{ HttpMethods, HttpRequest, Uri } +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.pattern.pipe +import akka.stream.ActorMaterializer +import akka.util.PrettyDuration +import akka.actor.Status + +import scala.concurrent.duration._ + +import akka.actor.Cancellable + +@InternalApi +private[dns] object HttpContactPointBootstrap { + + def props(settings: ClusterBootstrapSettings, notifyOnJoinReady: ActorRef, probeAddress: Uri): Props = + Props(new HttpContactPointBootstrap(settings, notifyOnJoinReady, probeAddress)) + + object Protocol { + object Internal { + final case class ProbeNow() + case object ScheduledProbeNow + final case class ContactPointProbeResponse() + } + } +} + +/** + * Intended to be spawned as child actor by a higher-level Bootstrap coordinator that manages obtaining of the URIs. + * + * This additional step may at-first seem superficial -- after all, we already have some addresses of the nodes + * that we'll want to join -- however it is not optional. By communicating with the actual nodes before joining their + * cluster we're able to inquire about their status, double-check if perhaps they are part of an existing cluster already + * that we should join, or even coordinate rolling upgrades or more advanced patterns. + */ +@InternalApi +private[dns] class HttpContactPointBootstrap( + settings: ClusterBootstrapSettings, + notifyActor: ActorRef, + baseUri: Uri +) extends Actor + with ActorLogging + with HttpBootstrapJsonProtocol { + + import HttpContactPointBootstrap._ + import HttpContactPointBootstrap.Protocol._ + + private val cluster = Cluster(context.system) + + if (baseUri.authority.host.address() == cluster.selfAddress.host.getOrElse("---") && + baseUri.authority.port == cluster.selfAddress.port.getOrElse(-1)) { + throw new IllegalArgumentException( + "Requested base Uri to be probed matches local remoting address, bailing out! " + + s"Uri: $baseUri, this node's remoting address: ${cluster.selfAddress}") + } + + private implicit val mat = ActorMaterializer() + private val http = Http()(context.system) + import context.dispatcher + + private val probeInterval = settings.contactPoint.probeInterval + + private val probeRequest = ClusterBootstrapRequests.bootstrapSeedNodes(baseUri) + + /** + * If we don't observe any seed-nodes until the deadline triggers, we notify the parent about it, + * such that it may make the decision to join this node to itself or not (initiating a new cluster). + */ + private val existingClusterNotObservedWithinDeadline: Deadline = settings.contactPoint.noSeedsStableMargin.fromNow + + private var timerTask: Option[Cancellable] = None + + override def preStart(): Unit = + self ! Internal.ProbeNow() + + override def postStop(): Unit = { + timerTask.foreach(_.cancel()) + super.postStop() + } + + override def receive = { + case Internal.ProbeNow() ⇒ + probeNow() + + case Internal.ScheduledProbeNow ⇒ + if (timerTask.isDefined) { + probeNow() + timerTask = None + } + + case Status.Failure(cause) => + log.error("Probing {} failed due to {}", probeRequest.uri, cause.getMessage) + // keep probing, hoping the request will eventually succeed + scheduleNextContactPointProbing() + + case response @ SeedNodes(node, members) ⇒ + if (members.isEmpty) { + if (clusterNotObservedWithinDeadline) { + permitParentToFormClusterIfPossible() + + // if we are not the lowest address, we won't join ourselves, + // and then we'll end up observing someone else forming the cluster, so we continue probing + scheduleNextContactPointProbing() + } else { + // we keep probing and looking if maybe a cluster does form after all + // + // (technically could be long polling or web-sockets, but that would need reconnect logic, so this is simpler) + scheduleNextContactPointProbing() + } + } else { + notifyParentNoSeedNodesWereFoundWithinDeadline(response) + // we notified the parent that it may join itself if it is the designated node, + // since we did not observe any existing cluster. However, in case this node + // can't join itself (it's not the lowest address), some other node will -- + // so we continue probing. + // + // Summing up, one of the following will happen: + // A) this node is allowed to join itself, and does so, and stops this probing actor -- our job is done. + // B) some other node triggers the same process and joins itself + // - in which case we'll notice seed-nodes in our probing sooner or later! + scheduleNextContactPointProbing() + } + } + + private def probeNow(): Unit = { + log.debug("Probing {} for seed nodes...", probeRequest.uri) + + http + .singleRequest(probeRequest) + .flatMap(_.entity.toStrict(settings.contactPoint.probeTimeout)) + .flatMap(res ⇒ Unmarshal(res).to[SeedNodes]) + .pipeTo(self) + } + + private def clusterNotObservedWithinDeadline: Boolean = + existingClusterNotObservedWithinDeadline.isOverdue() + + private def permitParentToFormClusterIfPossible(): Unit = { + log.debug("No seed-nodes obtained from {} within stable margin [{}], may want to initiate the cluster myself...", + baseUri, settings.contactPoint.noSeedsStableMargin) + + context.parent ! HeadlessServiceDnsBootstrap.Protocol.NoSeedNodesObtainedWithinDeadline(baseUri) + } + + private def notifyParentNoSeedNodesWereFoundWithinDeadline(members: SeedNodes): Unit = { + log.info("Found existing cluster, {} returned seed-nodes: {}", members.selfNode, members.seedNodes) + + val seedAddresses = members.seedNodes.map(_.node) + context.parent ! HeadlessServiceDnsBootstrap.Protocol.ObtainedHttpSeedNodesObservation(members.selfNode, + seedAddresses) + } + + private def scheduleNextContactPointProbing(): Unit = { + // this re-scheduling of timer tasks might not be completely safe, e.g. in case of restarts, but should + // be good enough for the Akka 2.4 release. In the Akka 2.5 release we are using Timers. + timerTask.foreach(_.cancel()) + timerTask = Some(context.system.scheduler.scheduleOnce(effectiveProbeInterval(), self, Internal.ScheduledProbeNow)) + } + + /** Duration with configured jitter applied */ + private def effectiveProbeInterval(): FiniteDuration = + probeInterval + jitter(probeInterval) + + def jitter(d: FiniteDuration): FiniteDuration = + (d.toMillis * settings.contactPoint.probeIntervalJitter * ThreadLocalRandom.current().nextDouble()).millis + +} diff --git a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagement.scala b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagement.scala index ddffb0fa5..64d4461d3 100644 --- a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagement.scala +++ b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagement.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Lightbend Inc. + * Copyright (C) 2017 Lightbend Inc. */ package akka.cluster.http.management @@ -7,210 +7,25 @@ import java.util.concurrent.atomic.AtomicReference import akka.Done import akka.actor.AddressFromURIString +import akka.cluster.bootstrap.{ ClusterBootstrap, ClusterBootstrapSettings } +import akka.cluster.bootstrap.contactpoint.HttpClusterBootstrapRoutes import akka.cluster.sharding.{ ClusterSharding, ShardRegion } import akka.cluster.{ Cluster, Member, MemberStatus } +import akka.event.Logging import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport -import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.{ StatusCodes, Uri } import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{ Route, RouteResult } import akka.http.scaladsl.{ ConnectionContext, Http } import akka.pattern.{ ask, AskTimeoutException } import akka.stream.ActorMaterializer import akka.util.Timeout +import com.typesafe.config.ConfigFactory import spray.json.DefaultJsonProtocol import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } -final case class ClusterUnreachableMember(node: String, observedBy: Seq[String]) -final case class ClusterMember(node: String, nodeUid: String, status: String, roles: Set[String]) -final case class ClusterMembers(selfNode: String, - members: Set[ClusterMember], - unreachable: Seq[ClusterUnreachableMember], - leader: Option[String], - oldest: Option[String]) -final case class ClusterHttpManagementMessage(message: String) -final case class ShardRegionInfo(shardId: String, numEntities: Int) -final case class ShardDetails(regions: Seq[ShardRegionInfo]) - -private[akka] sealed trait ClusterHttpManagementOperation -private[akka] case object Down extends ClusterHttpManagementOperation -private[akka] case object Leave extends ClusterHttpManagementOperation -private[akka] case object Join extends ClusterHttpManagementOperation - -object ClusterHttpManagementOperation { - def fromString(value: String): Option[ClusterHttpManagementOperation] = - Vector(Down, Leave, Join).find(_.toString.equalsIgnoreCase(value)) -} - -trait ClusterHttpManagementJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol { - implicit val clusterUnreachableMemberFormat = jsonFormat2(ClusterUnreachableMember) - implicit val clusterMemberFormat = jsonFormat4(ClusterMember) - implicit val clusterMembersFormat = jsonFormat5(ClusterMembers) - implicit val clusterMemberMessageFormat = jsonFormat1(ClusterHttpManagementMessage) - implicit val shardRegionInfoFormat = jsonFormat2(ShardRegionInfo) - implicit val shardDetailsFormat = jsonFormat1(ShardDetails) -} - -trait ClusterHttpManagementHelper extends ClusterHttpManagementJsonProtocol { - def memberToClusterMember(m: Member): ClusterMember = - ClusterMember(s"${m.uniqueAddress.address}", s"${m.uniqueAddress.longUid}", s"${m.status}", m.roles) -} - -object ClusterHttpManagementRoutes extends ClusterHttpManagementHelper { - - private def routeGetMembers(cluster: Cluster) = - get { - complete { - val members = cluster.readView.state.members.map(memberToClusterMember) - - val unreachable = cluster.readView.reachability.observersGroupedByUnreachable.toSeq.sortBy(_._1).map { - case (subject, observers) ⇒ - ClusterUnreachableMember(s"${subject.address}", observers.toSeq.sorted.map(m ⇒ s"${m.address}")) - } - - val leader = cluster.readView.leader.map(_.toString) - val oldest = cluster.state.members.toSeq - .filter(_.status == MemberStatus.Up) - .sorted(Member.ageOrdering) - .headOption // we are only interested in the oldest one that is still Up - .map(_.address.toString) - - ClusterMembers(s"${cluster.readView.selfAddress}", members, unreachable, leader, oldest) - } - } - - private def routePostMembers(cluster: Cluster) = - post { - formField('address) { addressString ⇒ - complete { - val address = AddressFromURIString(addressString) - cluster.join(address) - ClusterHttpManagementMessage(s"Joining $address") - } - } - } - - private def routeGetMember(cluster: Cluster, member: Member) = - get { - complete { - memberToClusterMember(member) - } - } - - private def routeDeleteMember(cluster: Cluster, member: Member) = - delete { - complete { - cluster.leave(member.uniqueAddress.address) - ClusterHttpManagementMessage(s"Leaving ${member.uniqueAddress.address}") - } - } - - private def routePutMember(cluster: Cluster, member: Member) = - put { - formField('operation) { operation ⇒ - ClusterHttpManagementOperation.fromString(operation) match { - case Some(Down) ⇒ - cluster.down(member.uniqueAddress.address) - complete(ClusterHttpManagementMessage(s"Downing ${member.uniqueAddress.address}")) - case Some(Leave) ⇒ - cluster.leave(member.uniqueAddress.address) - complete(ClusterHttpManagementMessage(s"Leaving ${member.uniqueAddress.address}")) - case _ ⇒ - complete(StatusCodes.BadRequest → ClusterHttpManagementMessage("Operation not supported")) - } - } - } - - private def findMember(cluster: Cluster, memberAddress: String): Option[Member] = - cluster.readView.members.find( - m ⇒ s"${m.uniqueAddress.address}" == memberAddress || m.uniqueAddress.address.hostPort == memberAddress) - - private def routesMember(cluster: Cluster) = - path(Remaining) { memberAddress ⇒ - findMember(cluster, memberAddress) match { - case Some(member) ⇒ - routeGetMember(cluster, member) ~ routeDeleteMember(cluster, member) ~ routePutMember(cluster, member) - case None ⇒ - complete(StatusCodes.NotFound → ClusterHttpManagementMessage(s"Member [$memberAddress] not found")) - } - } - - private def routeGetShardInfo(cluster: Cluster, shardRegionName: String) = - get { - extractExecutionContext { implicit executor => - complete { - implicit val timeout = Timeout(5.seconds) - try { - ClusterSharding(cluster.system) - .shardRegion(shardRegionName) - .ask(ShardRegion.GetShardRegionStats) - .mapTo[ShardRegion.ShardRegionStats] - .map { shardRegionStats => - ShardDetails(shardRegionStats.stats.map(s => ShardRegionInfo(s._1, s._2)).toSeq) - } - } catch { - case _: AskTimeoutException => - StatusCodes.NotFound → ClusterHttpManagementMessage( - s"Shard Region $shardRegionName not responding, may have been terminated") - case _: IllegalArgumentException => - StatusCodes.NotFound → ClusterHttpManagementMessage(s"Shard Region $shardRegionName is not started") - } - } - } - } - - /** - * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified - * [[akka.cluster.Cluster]] instance. This version does not provide Basic Authentication. It uses - * the default path "members". - */ - def apply(cluster: Cluster): Route = apply(cluster, "") - - /** - * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified - * [[akka.cluster.Cluster]] instance. This version does not provide Basic Authentication. It uses - * the specified path `pathPrefixName`. - */ - def apply(cluster: Cluster, pathPrefixName: String): Route = { - val basePath = if (pathPrefixName.isEmpty) rawPathPrefix(pathPrefixName) else pathPrefix(pathPrefixName) - - basePath { - pathPrefix("members") { - pathEndOrSingleSlash { - routeGetMembers(cluster) ~ routePostMembers(cluster) - } ~ - routesMember(cluster) - } ~ - pathPrefix("shards" / Remaining) { shardRegionName => - pathEnd { - routeGetShardInfo(cluster, shardRegionName) - } - } - } - } - - /** - * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified - * [[akka.cluster.Cluster]] instance. This version provides Basic Authentication through the specified - * AsyncAuthenticator. It uses the default path "members". - */ - def apply(cluster: Cluster, asyncAuthenticator: AsyncAuthenticator[String]): Route = - authenticateBasicAsync[String](realm = "secured", asyncAuthenticator) { _ ⇒ - apply(cluster) - } - - /** - * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified - * [[akka.cluster.Cluster]] instance. This version provides Basic Authentication through the specified - * AsyncAuthenticator. It uses the specified path `pathPrefixName`. - */ - def apply(cluster: Cluster, pathPrefixName: String, asyncAuthenticator: AsyncAuthenticator[String]): Route = - authenticateBasicAsync[String](realm = "secured", asyncAuthenticator) { _ ⇒ - apply(cluster, pathPrefixName) - } -} - object ClusterHttpManagement { /** @@ -372,6 +187,8 @@ class ClusterHttpManagement( private implicit val materializer = ActorMaterializer() import system.dispatcher + private val log = Logging(system, getClass) + private val bindingFuture = new AtomicReference[Future[Http.ServerBinding]]() def start(): Future[Done] = { @@ -384,34 +201,52 @@ class ClusterHttpManagement( case (None, None) ⇒ ClusterHttpManagementRoutes(cluster) } - val routes = RouteResult.route2HandlerFlow(clusterHttpManagementRoutes) - - val serverFutureBinding = https match { - case Some(context) ⇒ - Http().bindAndHandle( - routes, - settings.ClusterHttpManagementHostname, - settings.ClusterHttpManagementPort, - connectionContext = context - ) - case None ⇒ - Http().bindAndHandle( - routes, - settings.ClusterHttpManagementHostname, - settings.ClusterHttpManagementPort - ) - } + // TODO instead of binding to hardcoded things here, discovery can also be used for this binding! + val hostname = settings.ClusterHttpManagementHostname + val port = settings.ClusterHttpManagementPort + // Basically: "give me the SRV host/port for the port called `akka-bootstrap`" + // discovery.lookup("_akka-bootstrap" + ".effective-name.default").find(myaddress) + // ---- + + // FIXME -- think about the style of how we want to make these available + // I was rather thinking that management extensions should be able to be registered somehow + // and then be included in here + val bootstrapConfig = ConfigFactory.parseString(s""" + # we currently bind to the same port as akka-management and rely this information this way + akka.cluster.bootstrap.contact-point.port-fallback = $port + + """).withFallback(system.settings.config) + + val bootstrapSettings = ClusterBootstrapSettings(bootstrapConfig) + val bootstrapContactPointRoutes = new HttpClusterBootstrapRoutes(bootstrapSettings) + ClusterBootstrap(system).setSelfContactPoint(Uri(s"http://$hostname:$port")) + // FIXME -- end of fixme section + + val routes = RouteResult.route2HandlerFlow( + clusterHttpManagementRoutes ~ + bootstrapContactPointRoutes.routes // FIXME provide a nicer way to extend akka-management routes + ) + + val serverFutureBinding = + Http().bindAndHandle( + routes, + hostname, + port, + connectionContext = https.getOrElse(Http().defaultServerHttpContext) + ) + + log.info("Bound akka-management HTTP endpoint to: {}:{}", hostname, port) serverBindingPromise.completeWith(serverFutureBinding) serverBindingPromise.future.map(_ => Done) } else { - Future(Done) + Future.successful(Done) } } def stop(): Future[Done] = if (bindingFuture.get() == null) { - Future(Done) + Future.successful(Done) } else { val stopFuture = bindingFuture.get().flatMap(_.unbind()).map(_ => Done) bindingFuture.set(null) diff --git a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementProtocol.scala b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementProtocol.scala new file mode 100644 index 000000000..16c6f57b1 --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementProtocol.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.http.management + +import akka.cluster.Member +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.DefaultJsonProtocol + +final case class ClusterUnreachableMember(node: String, observedBy: Seq[String]) +final case class ClusterMember(node: String, nodeUid: String, status: String, roles: Set[String]) +final case class ClusterMembers(selfNode: String, + members: Set[ClusterMember], + unreachable: Seq[ClusterUnreachableMember], + leader: Option[String], + oldest: Option[String]) +final case class ClusterHttpManagementMessage(message: String) +final case class ShardRegionInfo(shardId: String, numEntities: Int) +final case class ShardDetails(regions: Seq[ShardRegionInfo]) + +private[akka] sealed trait ClusterHttpManagementOperation +private[akka] case object Down extends ClusterHttpManagementOperation +private[akka] case object Leave extends ClusterHttpManagementOperation +private[akka] case object Join extends ClusterHttpManagementOperation + +object ClusterHttpManagementOperation { + def fromString(value: String): Option[ClusterHttpManagementOperation] = + Vector(Down, Leave, Join).find(_.toString.equalsIgnoreCase(value)) +} + +trait ClusterHttpManagementJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol { + implicit val clusterUnreachableMemberFormat = jsonFormat2(ClusterUnreachableMember) + implicit val clusterMemberFormat = jsonFormat4(ClusterMember) + implicit val clusterMembersFormat = jsonFormat5(ClusterMembers) + implicit val clusterMemberMessageFormat = jsonFormat1(ClusterHttpManagementMessage) + implicit val shardRegionInfoFormat = jsonFormat2(ShardRegionInfo) + implicit val shardDetailsFormat = jsonFormat1(ShardDetails) +} + +trait ClusterHttpManagementHelper extends ClusterHttpManagementJsonProtocol { + def memberToClusterMember(m: Member): ClusterMember = + ClusterMember(s"${m.uniqueAddress.address}", s"${m.uniqueAddress.longUid}", s"${m.status}", m.roles) +} diff --git a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementRoutes.scala b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementRoutes.scala new file mode 100644 index 000000000..365f14958 --- /dev/null +++ b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementRoutes.scala @@ -0,0 +1,170 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.http.management + +import akka.actor.AddressFromURIString +import akka.cluster.{ Cluster, Member, MemberStatus } +import akka.cluster.sharding.{ ClusterSharding, ShardRegion } +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.server.Route +import akka.pattern.AskTimeoutException +import akka.util.Timeout + +import akka.pattern.ask +import scala.concurrent.duration._ + +object ClusterHttpManagementRoutes extends ClusterHttpManagementHelper { + import akka.http.scaladsl.server.Directives._ + + private def routeGetMembers(cluster: Cluster) = + get { + complete { + val members = cluster.readView.state.members.map(memberToClusterMember) + + val unreachable = cluster.readView.reachability.observersGroupedByUnreachable.toSeq.sortBy(_._1).map { + case (subject, observers) ⇒ + ClusterUnreachableMember(s"${subject.address}", observers.toSeq.sorted.map(m ⇒ s"${m.address}")) + } + + val leader = cluster.readView.leader.map(_.toString) + val oldest = cluster.state.members.toSeq + .filter(node => node.status == MemberStatus.Up) + .sorted(Member.ageOrdering) + .headOption // we are only interested in the oldest one that is still Up + .map(_.address.toString) + + ClusterMembers(s"${cluster.readView.selfAddress}", members, unreachable, leader, oldest) + } + } + + private def routePostMembers(cluster: Cluster) = + post { + formField('address) { addressString ⇒ + complete { + val address = AddressFromURIString(addressString) + cluster.join(address) + ClusterHttpManagementMessage(s"Joining $address") + } + } + } + + private def routeGetMember(cluster: Cluster, member: Member) = + get { + complete { + memberToClusterMember(member) + } + } + + private def routeDeleteMember(cluster: Cluster, member: Member) = + delete { + complete { + cluster.leave(member.uniqueAddress.address) + ClusterHttpManagementMessage(s"Leaving ${member.uniqueAddress.address}") + } + } + + private def routePutMember(cluster: Cluster, member: Member) = + put { + formField('operation) { operation ⇒ + ClusterHttpManagementOperation.fromString(operation) match { + case Some(Down) ⇒ + cluster.down(member.uniqueAddress.address) + complete(ClusterHttpManagementMessage(s"Downing ${member.uniqueAddress.address}")) + case Some(Leave) ⇒ + cluster.leave(member.uniqueAddress.address) + complete(ClusterHttpManagementMessage(s"Leaving ${member.uniqueAddress.address}")) + case _ ⇒ + complete(StatusCodes.BadRequest → ClusterHttpManagementMessage("Operation not supported")) + } + } + } + + private def findMember(cluster: Cluster, memberAddress: String): Option[Member] = + cluster.readView.members.find( + m ⇒ s"${m.uniqueAddress.address}" == memberAddress || m.uniqueAddress.address.hostPort == memberAddress) + + private def routesMember(cluster: Cluster) = + path(Remaining) { memberAddress ⇒ + findMember(cluster, memberAddress) match { + case Some(member) ⇒ + routeGetMember(cluster, member) ~ routeDeleteMember(cluster, member) ~ routePutMember(cluster, member) + case None ⇒ + complete(StatusCodes.NotFound → ClusterHttpManagementMessage(s"Member [$memberAddress] not found")) + } + } + + private def routeGetShardInfo(cluster: Cluster, shardRegionName: String) = + get { + extractExecutionContext { implicit executor => + complete { + implicit val timeout = Timeout(5.seconds) + try { + ClusterSharding(cluster.system) + .shardRegion(shardRegionName) + .ask(ShardRegion.GetShardRegionStats) + .mapTo[ShardRegion.ShardRegionStats] + .map { shardRegionStats => + ShardDetails(shardRegionStats.stats.map(s => ShardRegionInfo(s._1, s._2)).toSeq) + } + } catch { + case _: AskTimeoutException => + StatusCodes.NotFound → ClusterHttpManagementMessage( + s"Shard Region $shardRegionName not responding, may have been terminated") + case _: IllegalArgumentException => + StatusCodes.NotFound → ClusterHttpManagementMessage(s"Shard Region $shardRegionName is not started") + } + } + } + } + + /** + * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified + * [[akka.cluster.Cluster]] instance. This version does not provide Basic Authentication. It uses + * the default path "members". + */ + def apply(cluster: Cluster): Route = apply(cluster, "") + + /** + * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified + * [[akka.cluster.Cluster]] instance. This version does not provide Basic Authentication. It uses + * the specified path `pathPrefixName`. + */ + def apply(cluster: Cluster, pathPrefixName: String): Route = { + val basePath = if (pathPrefixName.isEmpty) rawPathPrefix(pathPrefixName) else pathPrefix(pathPrefixName) + + basePath { + pathPrefix("members") { + pathEndOrSingleSlash { + routeGetMembers(cluster) ~ routePostMembers(cluster) + } ~ + routesMember(cluster) + } ~ + pathPrefix("shards" / Remaining) { shardRegionName => + pathEnd { + routeGetShardInfo(cluster, shardRegionName) + } + } + } + } + + /** + * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified + * [[akka.cluster.Cluster]] instance. This version provides Basic Authentication through the specified + * AsyncAuthenticator. It uses the default path "members". + */ + def apply(cluster: Cluster, asyncAuthenticator: AsyncAuthenticator[String]): Route = + authenticateBasicAsync[String](realm = "secured", asyncAuthenticator) { _ ⇒ + apply(cluster) + } + + /** + * Creates an instance of [[akka.cluster.http.management.ClusterHttpManagementRoutes]] to manage the specified + * [[akka.cluster.Cluster]] instance. This version provides Basic Authentication through the specified + * AsyncAuthenticator. It uses the specified path `pathPrefixName`. + */ + def apply(cluster: Cluster, pathPrefixName: String, asyncAuthenticator: AsyncAuthenticator[String]): Route = + authenticateBasicAsync[String](realm = "secured", asyncAuthenticator) { _ ⇒ + apply(cluster, pathPrefixName) + } +} diff --git a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementSettings.scala b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementSettings.scala index 3d02b6540..c5d71d36e 100644 --- a/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementSettings.scala +++ b/cluster-http/src/main/scala/akka/cluster/http/management/ClusterHttpManagementSettings.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Lightbend Inc. + * Copyright (C) 2017 Lightbend Inc. */ package akka.cluster.http.management diff --git a/cluster-http/src/main/scala/akka/discovery/AkkaDnsServiceDiscovery.scala b/cluster-http/src/main/scala/akka/discovery/AkkaDnsServiceDiscovery.scala new file mode 100644 index 000000000..437b55cb9 --- /dev/null +++ b/cluster-http/src/main/scala/akka/discovery/AkkaDnsServiceDiscovery.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.discovery + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.io.{ Dns, IO } +import akka.pattern.ask + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +/** + * Looks for A records for a given service. + */ +class AkkaDnsServiceDiscovery(system: ActorSystem) extends ServiceDiscovery { + private val log = Logging(system, getClass) + private val dns = IO(Dns)(system) + import system.dispatcher + + override def lookup(name: String, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = { + def cleanIpString(ipString: String): String = + if (ipString.startsWith("/")) ipString.tail else ipString + + dns.ask(Dns.Resolve(name))(resolveTimeout) map { + case resolved: Dns.Resolved => + log.info("Resolved Dns.Resolved: {}", resolved) + val addresses = resolved.ipv4.map { entry ⇒ + ServiceDiscovery.ResolvedTarget(cleanIpString(entry.getHostAddress), None) + } + ServiceDiscovery.Resolved(name, addresses) + + case resolved ⇒ + log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) + ServiceDiscovery.Resolved(name, Nil) + } + } +} diff --git a/cluster-http/src/main/scala/akka/discovery/DnsSrvServiceDiscovery.scala b/cluster-http/src/main/scala/akka/discovery/DnsSrvServiceDiscovery.scala new file mode 100644 index 000000000..a4f39c723 --- /dev/null +++ b/cluster-http/src/main/scala/akka/discovery/DnsSrvServiceDiscovery.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.discovery + +/* FIXME if we want to use SRV records, we need to get the code from the ru.smslv.akka codebase or implement it in Akka + +import akka.actor.ActorSystem +import akka.event.Logging +//import akka.io.AsyncDnsResolver.SrvResolved +import akka.io.{ Dns, IO } +import akka.pattern.ask +import com.typesafe.config.Config + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +/** + * Looks for A records for a given service. + */ +class DnsSrvServiceDiscovery(system: ActorSystem) extends ServiceDiscovery { + private val log = Logging(system, getClass) + private val dns = IO(Dns)(system) + import system.dispatcher + + val config = new DnsSrvServiceDiscoverySettings(system.settings.config) + + override def lookup(name: String, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = { + def cleanIpString(ipString: String): String = + if (ipString.startsWith("/")) ipString.tail else ipString + + dns.ask(Dns.Resolve(name))(resolveTimeout) map { +// This is the SRV specific code part +// case srv: SrvResolved => +// log.info("Resolved Srv.Resolved: {}", srv) +// val addresses = srv.srv.map { entry ⇒ +// ServiceDiscovery.ResolvedTarget(cleanIpString(entry.target), Some(entry.port)) +// } +// ServiceDiscovery.Resolved(name, addresses) + + case resolved: Dns.Resolved => + log.info("Resolved Dns.Resolved: {}", resolved) + val addresses = resolved.ipv4.map { entry ⇒ + ServiceDiscovery.ResolvedTarget(cleanIpString(entry.getHostAddress), None) + } + ServiceDiscovery.Resolved(name, addresses) + + case resolved ⇒ + log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) + ServiceDiscovery.Resolved(name, Nil) + } + } +} + +class DnsSrvServiceDiscoverySettings(config: Config) {} + + */ diff --git a/cluster-http/src/main/scala/akka/discovery/ServiceDiscovery.scala b/cluster-http/src/main/scala/akka/discovery/ServiceDiscovery.scala new file mode 100644 index 000000000..308e1a4a2 --- /dev/null +++ b/cluster-http/src/main/scala/akka/discovery/ServiceDiscovery.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.discovery + +import akka.actor.DeadLetterSuppression +import akka.annotation.ApiMayChange + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +@ApiMayChange +object ServiceDiscovery { + final case class Resolved(serviceName: String, addresses: immutable.Seq[ResolvedTarget]) + extends DeadLetterSuppression + final case class ResolvedTarget(host: String, port: Option[Int]) +} + +@ApiMayChange +trait ServiceDiscovery { + import ServiceDiscovery._ + def lookup(name: String, resolveTimeout: FiniteDuration): Future[Resolved] +} diff --git a/cluster-http/src/test/java/akka/cluster/http/management/ClusterHttpManagementJavaCompileTest.java b/cluster-http/src/test/java/akka/cluster/http/management/ClusterHttpManagementJavaCompileTest.java index 2ebd8ea71..229f72572 100644 --- a/cluster-http/src/test/java/akka/cluster/http/management/ClusterHttpManagementJavaCompileTest.java +++ b/cluster-http/src/test/java/akka/cluster/http/management/ClusterHttpManagementJavaCompileTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Lightbend Inc. + * Copyright (C) 2017 Lightbend Inc. */ package akka.cluster.http.management; diff --git a/cluster-http/src/test/resources/reference.conf b/cluster-http/src/test/resources/reference.conf new file mode 100644 index 000000000..c1889065c --- /dev/null +++ b/cluster-http/src/test/resources/reference.conf @@ -0,0 +1,16 @@ +akka { + actor { + provider = "cluster" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 2552 + } + } + + # this can be refered to in tests to use the mock discovery implementation + mock-dns.class = "akka.discovery.MockDiscovery" + +} diff --git a/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/ClusterBootstrapDnsHttpIntegrationSpec.scala b/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/ClusterBootstrapDnsHttpIntegrationSpec.scala new file mode 100644 index 000000000..5dc503157 --- /dev/null +++ b/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/ClusterBootstrapDnsHttpIntegrationSpec.scala @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.contactpoint + +import akka.actor.ActorSystem +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState, MemberUp } +import akka.cluster.bootstrap.ClusterBootstrap +import akka.cluster.http.management.ClusterHttpManagement +import akka.discovery.MockDiscovery +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.testkit.{ SocketUtil, TestKit, TestProbe } +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class ClusterBootstrapDnsHttpIntegrationSpec extends WordSpecLike with Matchers { + + "Cluster Bootstrap" should { + + var remotingPorts = Map.empty[String, Int] + var contactPointPorts = Map.empty[String, Int] + + def config(id: String): Config = { + val managementPort = SocketUtil.temporaryServerAddress("127.0.0.1").getPort + val remotingPort = SocketUtil.temporaryServerAddress("127.0.0.1").getPort + + info(s"System [$id]: management port: $managementPort") + info(s"System [$id]: remoting port: $remotingPort") + + contactPointPorts = contactPointPorts.updated(id, managementPort) + remotingPorts = remotingPorts.updated(id, remotingPort) + + ConfigFactory.parseString(s""" + akka { + loglevel = INFO + + cluster.jmx.multi-mbeans-in-same-jvm = on + + cluster.http.management.port = ${managementPort} + remote.netty.tcp.port = ${remotingPort} + + cluster.bootstrap { + contact-point-discovery { + discovery-method = akka.mock-dns + + service-namespace = "svc.cluster.local" + } + + contact-point { + no-seeds-stable-margin = 4 seconds + } + } + } + """.stripMargin).withFallback(ConfigFactory.load()) + } + + val systemA = ActorSystem("System", config("A")) + val systemB = ActorSystem("System", config("B")) + val systemC = ActorSystem("System", config("C")) + + val clusterA = Cluster(systemA) + val clusterB = Cluster(systemB) + val clusterC = Cluster(systemC) + + val bootstrapA = ClusterBootstrap(systemA) + val bootstrapB = ClusterBootstrap(systemB) + val bootstrapC = ClusterBootstrap(systemC) + + // prepare the "mock DNS" + val name = "system.svc.cluster.local" + MockDiscovery.set(name, + Resolved(name, + List( + ResolvedTarget(clusterA.selfAddress.host.get, contactPointPorts.get("A")), + ResolvedTarget(clusterB.selfAddress.host.get, contactPointPorts.get("B")), + ResolvedTarget(clusterC.selfAddress.host.get, contactPointPorts.get("C")) + ))) + + "start listening with the http contact-points on 3 systems" in { + ClusterHttpManagement(Cluster(systemA)).start() + ClusterHttpManagement(Cluster(systemB)).start() + ClusterHttpManagement(Cluster(systemC)).start() + } + + "join three DNS discovered nodes by forming new cluster (happy path)" in { + bootstrapA.discovery.getClass should ===(classOf[MockDiscovery]) + + bootstrapA.start() + bootstrapB.start() + bootstrapC.start() + + val pA = TestProbe()(systemA) + clusterA.subscribe(pA.ref, classOf[MemberUp]) + + pA.expectMsgType[CurrentClusterState] + val up1 = pA.expectMsgType[MemberUp](30.seconds) + info("" + up1) + } + + "terminate all systems" in { + try TestKit.shutdownActorSystem(systemA, 3.seconds) + finally try TestKit.shutdownActorSystem(systemB, 3.seconds) + finally TestKit.shutdownActorSystem(systemC, 3.seconds) + } + + } + +} diff --git a/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/HttpContactPointRoutesSpec.scala b/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/HttpContactPointRoutesSpec.scala new file mode 100644 index 000000000..25e1aeafb --- /dev/null +++ b/cluster-http/src/test/scala/akka/cluster/bootstrap/contactpoint/HttpContactPointRoutesSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap.contactpoint + +import akka.cluster.{ Cluster, ClusterEvent } +import akka.cluster.ClusterEvent.ClusterDomainEvent +import akka.cluster.bootstrap.ClusterBootstrapSettings +import akka.cluster.http.management.ClusterHttpManagementJsonProtocol +import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.testkit.{ SocketUtil, TestProbe } +import org.scalatest.{ Matchers, WordSpecLike } + +class HttpContactPointRoutesSpec + extends WordSpecLike + with Matchers + with ScalatestRouteTest + with HttpBootstrapJsonProtocol { + + override def testConfigSource = + s""" + akka { + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = ${SocketUtil.temporaryServerAddress("127.0.0.1").getPort} + } + } + } + """.stripMargin + + "Http Bootstrap routes" should { + + val settings = ClusterBootstrapSettings(system.settings.config) + val httpBootstrap = new HttpClusterBootstrapRoutes(settings) + + "empty list if node is not part of a cluster" in { + ClusterBootstrapRequests.bootstrapSeedNodes("") ~> httpBootstrap.routes ~> check { + responseAs[String] should include(""""seedNodes":[]""") + } + } + + "include seed nodes when part of a cluster" in { + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + + val p = TestProbe() + cluster.subscribe(p.ref, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberUp]) + val up = p.expectMsgType[ClusterEvent.MemberUp] + up.member.uniqueAddress should ===(cluster.selfUniqueAddress) + + ClusterBootstrapRequests.bootstrapSeedNodes("") ~> httpBootstrap.routes ~> check { + val response = responseAs[HttpBootstrapJsonProtocol.SeedNodes] + response.seedNodes should !==(Nil) + response.seedNodes.map(_.node) should contain(cluster.selfAddress) + } + } + } + +} diff --git a/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementRoutesSpec.scala b/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementRoutesSpec.scala index 1a7e18b7a..fc695376b 100644 --- a/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementRoutesSpec.scala +++ b/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementRoutesSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Lightbend Inc. + * Copyright (C) 2017 Lightbend Inc. */ package akka.cluster.http.management diff --git a/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementSpec.scala b/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementSpec.scala index 760e43cce..52e980a60 100644 --- a/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementSpec.scala +++ b/cluster-http/src/test/scala/akka/cluster/http/management/ClusterHttpManagementSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Lightbend Inc. + * Copyright (C) 2017 Lightbend Inc. */ package akka.cluster.http.management diff --git a/cluster-http/src/test/scala/akka/discovery/MockDiscovery.scala b/cluster-http/src/test/scala/akka/discovery/MockDiscovery.scala new file mode 100644 index 000000000..5ee62eb88 --- /dev/null +++ b/cluster-http/src/test/scala/akka/discovery/MockDiscovery.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.discovery + +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ActorSystem, ExtendedActorSystem } +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.Resolved +import akka.event.Logging +import akka.io.{ Dns, IO } + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +@InternalApi +object MockDiscovery { + private val data = new AtomicReference[Map[String, Resolved]](Map.empty) + + def set(name: String, to: Resolved): Unit = { + val d = data.get() + if (data.compareAndSet(d, d.updated(name, to))) () + else set(name, to) // retry + } + + def remove(name: String): Unit = { + val d = data.get() + if (data.compareAndSet(d, d - name)) () + else remove(name) // retry + } +} + +@InternalApi +final class MockDiscovery(system: ActorSystem) extends ServiceDiscovery { + + private val log = Logging(system, getClass) + + override def lookup(name: String, resolveTimeout: FiniteDuration): Future[Resolved] = + MockDiscovery.data.get().get(name) match { + case Some(res) ⇒ + log.info("Mock-resolved [{}] to [{}]", name, res) + Future.successful(res) + case None ⇒ + log.info("No mock-data for [{}], resolving as 'Nil'", name) + Future.successful(Resolved(name, Nil)) + } +} diff --git a/docs/src/main/paradox/bootstrap.md b/docs/src/main/paradox/bootstrap.md new file mode 100644 index 000000000..20fe6c10f --- /dev/null +++ b/docs/src/main/paradox/bootstrap.md @@ -0,0 +1,169 @@ + +# Akka Cluster Bootstrap + +The bootstrap module / extension allows an Akka Cluster to (semi) automatically discover its neighbouring nodes, +and join the current node to them if a cluster already exists, or safely form a new cluster for all those nodes, +if no cluster exists there yet. + +While bootstrap processes may be configured to use various implementations, the preferred, default (and currently only), +implementation utilises DNS records and so-called Contact Points on the target nodes to form the cluster. This works +particularity well in environments like Kubernetes or Mesos where DNS records are managed for Services automatically. +Please note that unlike many solutions that have been proposed by the wider community, this solution does NOT require +any additional system like etcd/zookeeper/consul to be run along side the Akka cluster in order to discover the seed-nodes. + +## The Akka DNS Bootstrap + +The Akka DNS Bootstrap process is composed of two phases. First, a minimum number of Contact Points (by default at least +2) need to be gathered. Currently it will look for `akka.cluster.bootstrap.contact-point-discovery.service-name` appended +with `akka.cluster.bootstrap.contact-point-discovery.service-namespace` (if present) A records in DNS. In Kubernetes managed +systems these would be available by default and list all `Pods` in a given `Service`. Those addresses are then contacted, +in what is referred to the Contact Point Probing procedure. Note that at this point the node has not joined any cluster yet. +The Contact Points are contacted using an alternative protocol which does not need membership, such as HTTP by default. + +In this moment, we have multiple nodes probing each other's Contact Points. And if a contact point responds with +a known set of seed nodes, the probing node will join those. This can be seen as "epidemic" joining. Since that node will, +once it has completed joining, also start advertising those seed nodes using its own Contact Point, so any other node that +has not yet joined, but is probing this node, would get this information and join the existing cluster. + +In the case no cluster exists yet -- the initial bootstrap of a cluster -- nodes will keep probing one another for a while +(`akka.cluster.bootstrap.contact-point.no-seeds-stable-margin`) and once that time margin passes, they will decide that +no cluster exists, and one of the seen nodes should join *itself* to become the first node of a new cluster. It is of utmost +importance that only one node joins itself, so this decision has to be made deterministically. Since we know the addresses +of all Contact Points, and the contact points relate 1:1 to a Akka Remoting (Akka Cluster) address of the given node, +we're able to use this information to make a deterministic decision, without coordination across these nodes, as to which +of them should perform this join. We make this decision by sorting the known addresses from lowest to highest, and the +*lowest* address joins itself. It will then start advertising itself as seed node in it's Contact Point, which other nodes +will notice and start joining this node. Now the process just explained in the previous paragraph, referred to as "epidemic +joining" continues until all nodes have joined the cluster. + +In summary, the process is as follows: + +* find Contact Points using DNS +* start probing each of the Contact Points for their known seed-nodes +* a) Cluster already exists + - if any of the Contact Points returns known seed-nodes, join them immediately + - the node is now part of the cluster, mission complete + - each time a node is added to the cluster, it is included in the seed-nodes (with a maximum limit of a few nodes), + which causes the joining to be spread out to the nodes which are already part of the cluster. They also start + advertising see-nodes, so if a new node contacts any of those fresly joined nodes, they join the same cluster. +* b) No cluster exists: + - if none of the contact points returns any seed nodes + - nodes will after a timeout realise they should form a new cluster, + and will decide that the "lowest" address should join itself. + - this deterministic decision causes one of the nodes to join itself, and start advertising itself in it's Contact Point + as seed-node + - other nodes notice this via contact point probing and join this node + - from here the process explained in "Cluster already exists" continues as explained in the "epidemic joining" process, + until all nodes have joined the cluster. + +TODO: We aim to use SRV records here, but A works fine as well. The impl should try SRV and fallback to A if none available. + +### Specific edge-cases explained + +TODO: There are specific very hard to cause edge cases in the self-join. It is important to realise that using a consistent +store would not help with these as well. The race exists regardless of how we obtain the list of nodes. + +### Discussion: Rationale for avoidance of external (consistent) data-store for seed-nodes + +TODO: explain that those only provide the illusion of safety, as races can still happen in the "self join", +even if a consistent data-store is used. It is only about probability of the issue happening, and with our sollution +we're as good as the same without the need for external stores other than DNS. + +TODO: explain that by forcing users to run a consensus cluster in order to even join members to another cluster is much +operational overhead, and not actually required. + +TODO: explain that a consistency picking datastore is NOT optimal for systems which need to contact such store to JOIN +a cluster. After all, you want to join new nodes perhaps when the system is under much load and even the consensus system +could then be overloaded -- causing you to be unable to join new nodes! By embracing a truly peer-to-peer joining model, +we can even join nodes (yes, safely) during intense traffic and avoid having one more system that could break. + +## Examples + +### Kubernetes example + +In Kubernetes, one would deploy an Akka Cluster as a single Headless Service, which can be configured as follows: + +TODO: explain why headless + +TODO: explain all the ports we expose + +TODO: Don't paste the example but link to sources. + + +``` +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + app: appka + name: appka +spec: + replicas: 4 + selector: + matchLabels: + app: appka + template: + metadata: + labels: + app: appka + spec: + containers: + - name: appka + image: ktoso/akka-management-joining-demo:1.3.3.7 + imagePullPolicy: Never + env: + - name: HOST_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + livenessProbe: + tcpSocket: +# port: 2551 + port: 19999 + ports: + # akka remoting + - name: remoting + containerPort: 2551 + protocol: TCP + # akka-management bootstrap + - name: bootstrap + containerPort: 8558 + protocol: TCP + # external http + - name: http + containerPort: 8080 + protocol: TCP +--- +kind: Service +apiVersion: v1 +metadata: + name: appka-service +spec: + # by setting the clusterIp to None we are a "headless service" + # and thus the svc ("service") DNS record for the single IP but the IPs of all nodes that we select + # + # In other words: + # $ kubectl exec -it $POD -- nslookup appka-service.default.svc.cluster.local + # Server: 10.0.0.10 + # Address: 10.0.0.10#53 + # + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.7 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.8 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.9 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.6 + clusterIP: None + selector: + app: appka + ports: + - protocol: TCP + port: 8558 + targetPort: 8558 + +``` + +TODO: complete this example with a demo sample output? diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index c63871856..71c6c25d1 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -10,5 +10,6 @@ Support for some protocols is provided out of the box and others are provided as * Akka Cluster - [HTTP](cluster-http-management.md) - [Built in](cluster-builtin-management.md) + - [Bootstrap](bootstrap.md) @@@ diff --git a/joining-demo/build.sbt b/joining-demo/build.sbt new file mode 100644 index 000000000..ae2283bc4 --- /dev/null +++ b/joining-demo/build.sbt @@ -0,0 +1,34 @@ +import com.typesafe.sbt.packager.docker._ + +enablePlugins(JavaServerAppPackaging) + +dockerEntrypoint ++= Seq( + """-Dakka.remote.netty.tcp.hostname="$(eval "echo $AKKA_REMOTING_BIND_HOST")"""", + """-Dakka.cluster.http.management.hostname="$(eval "echo $AKKA_REMOTING_BIND_HOST")"""" +// THIS WOULD ONLY BE NEEDED IF WE USED THE ASYNC-DNS CODE +//, +// "-Dakka.io.dns.resolver=async-dns", +// "-Dakka.io.dns.async-dns.resolve-srv=true", +// "-Dakka.io.dns.async-dns.resolv-conf=on" +) + +dockerCommands := + dockerCommands.value.flatMap { + case ExecCmd("ENTRYPOINT", args @ _*) => Seq(Cmd("ENTRYPOINT", args.mkString(" "))) + case v => Seq(v) + } + +version := "1.3.3.7" // we hard-code the version here, it could be anything really + +dockerUsername := Some("ktoso") + +// use += to add an item to a Sequence +dockerCommands += Cmd("USER", "root") + +// use ++= to merge a sequence with an existing sequence +// +// ENABLE THESE IF YOU WANT TO MANUALLY DO DNSLOOKUPS IN THE CONTAINER (FOR DEBUGGING) +//dockerCommands ++= Seq( +// ExecCmd("RUN", "apt-get", "update"), +// ExecCmd("RUN", "apt-get", "install", "-y", "dnsutils") +//) diff --git a/joining-demo/kube-create.sh b/joining-demo/kube-create.sh new file mode 100755 index 000000000..a7eccc98f --- /dev/null +++ b/joining-demo/kube-create.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +kubectl create -f kubernetes/akka-cluster.yml diff --git a/joining-demo/kube-delete.sh b/joining-demo/kube-delete.sh new file mode 100755 index 000000000..05696cfa5 --- /dev/null +++ b/joining-demo/kube-delete.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +kubectl delete services,pods,deployment -l app=appka +kubectl delete services,pods,deployment appka-service diff --git a/joining-demo/kube-logs.sh b/joining-demo/kube-logs.sh new file mode 100755 index 000000000..7c7b808dc --- /dev/null +++ b/joining-demo/kube-logs.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +# this is for getting a specific log, not just by the label +POD=$(kubectl get pods | grep appka | grep Running | head -n1 | awk '{ print $1 }') + +# this checks that DNS works +kubectl exec -it $POD -- nslookup kubernetes.default + +# checks if kube dns is running: +kubectl get pods --namespace=kube-system -l k8s-app=kube-dns +# checks if the svc service is up +kubectl get svc --namespace=kube-system + +#ktoso @ 三日月~/code/akka-management/joining [wip-joining*] +#$ kubectl exec -it $POD -- dig +short NS +#a.root-servers.net. +#b.root-servers.net. +#c.root-servers.net. +#d.root-servers.net. +#e.root-servers.net. +#f.root-servers.net. +#g.root-servers.net. +#h.root-servers.net. +#i.root-servers.net. +#j.root-servers.net. +#k.root-servers.net. +#l.root-servers.net. +#m.root-servers.net. + +# in general, read this: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#srv-records diff --git a/joining-demo/kubernetes/akka-cluster.yml b/joining-demo/kubernetes/akka-cluster.yml new file mode 100644 index 000000000..43af09a75 --- /dev/null +++ b/joining-demo/kubernetes/akka-cluster.yml @@ -0,0 +1,72 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + app: appka + name: appka +spec: + replicas: 4 + selector: + matchLabels: + app: appka + template: + metadata: + labels: + app: appka + spec: + containers: + - name: appka + image: ktoso/akka-management-joining-demo:1.3.3.7 + imagePullPolicy: Never + env: + - name: HOST_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + livenessProbe: + tcpSocket: +# port: 2551 + port: 19999 + ports: + # akka remoting + - name: remoting + containerPort: 2551 + protocol: TCP + # akka-management bootstrap + - name: bootstrap + containerPort: 8558 + protocol: TCP + # external http + - name: http + containerPort: 8080 + protocol: TCP +--- +kind: Service +apiVersion: v1 +metadata: + name: appka-service +spec: + # by setting the clusterIp to None we are a "headless service" + # and thus the svc ("service") DNS record for the single IP but the IPs of all nodes that we select + # + # In other words: + # $ kubectl exec -it $POD -- nslookup appka-service.default.svc.cluster.local + # Server: 10.0.0.10 + # Address: 10.0.0.10#53 + # + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.7 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.8 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.9 + # Name: appka-service.default.svc.cluster.local + # Address: 172.17.0.6 + clusterIP: None + selector: + app: appka + ports: + - protocol: TCP + port: 8558 + targetPort: 8558 diff --git a/joining-demo/src/main/resources/application.conf b/joining-demo/src/main/resources/application.conf new file mode 100644 index 000000000..d541fa901 --- /dev/null +++ b/joining-demo/src/main/resources/application.conf @@ -0,0 +1,40 @@ +akka { + actor { + provider = "cluster" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + } + +// # <<<< ru.smslv.akka-dns library >>>> +// # need to force that library to be used: +// akka.io.dns.resolver = async-dns +// # we do want to resovle SRV records if available +// akka.io.dns.async-dns.resolve-srv = on +// # do use resolv conf -- no reason not to do so really +// akka.io.dns.async-dns.resolv-conf = on +// # <<<< end of smslv.akka-dns library >>>> + +} + + +akka.cluster.http.management { + port = 19999 +} + +akka.cluster.bootstrap { + + contact-point-discovery { + service-name = "appka-service" + service-namespace = "default.svc.cluster.local" + } + + contact-point { + # currently this port HAS TO be the same as the `akka.cluster.http.management.port` + # it would not have to be once we implement the SRV record watching, since then we could potentially + # get the ports from the DNS records. + fallback-port = 19999 + + no-seeds-stable-margin = 3 seconds + } +} diff --git a/joining-demo/src/main/scala/akka/cluster/bootstrap/DemoApp.scala b/joining-demo/src/main/scala/akka/cluster/bootstrap/DemoApp.scala new file mode 100644 index 000000000..542199158 --- /dev/null +++ b/joining-demo/src/main/scala/akka/cluster/bootstrap/DemoApp.scala @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.cluster.bootstrap + +import java.util.concurrent.CompletionStage + +import akka.{ Done, NotUsed } +import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.cluster.ClusterEvent.ClusterDomainEvent +import akka.cluster.{ Cluster, ClusterEvent } +import akka.http.scaladsl.Http +import akka.io.{ Dns, IO } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink +import akka.cluster.http.management._ +import akka.stream.scaladsl.Source +import com.typesafe.config.ConfigFactory + +object DemoApp extends App { + + implicit val system = ActorSystem("Appka", ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = cluster + """).withFallback(ConfigFactory.load())) + + import system.log + import system.dispatcher + implicit val mat = ActorMaterializer() + implicit val cluster = Cluster(system) + + log.info(s"Started [$system], cluster.selfAddress = ${cluster.selfAddress}") + + ClusterHttpManagement(cluster).start() + ClusterBootstrap(system).start() + + cluster + .subscribe(system.actorOf(Props[ClusterWatcher]), ClusterEvent.InitialStateAsEvents, classOf[ClusterDomainEvent]) + + import akka.http.scaladsl.server.Directives._ + Http().bindAndHandle(complete("Hello world"), "0.0.0.0", 8080) + +} + +class ClusterWatcher extends Actor with ActorLogging { + implicit val cluster = Cluster(context.system) + + override def receive = { + case msg ⇒ log.info(s"Cluster ${cluster.selfAddress} >>> " + msg) + } +} diff --git a/project/Common.scala b/project/Common.scala index 0e26082fb..ed7c8ab8d 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -9,7 +9,7 @@ object Common extends AutoPlugin { val FileHeader = (HeaderPattern.cStyleBlockComment, """|/* - | * Copyright (C) 2016 Lightbend Inc. + | * Copyright (C) 2017 Lightbend Inc. | */ |""".stripMargin) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b8061a2f5..ed2e2e9b8 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,7 +4,9 @@ object Dependencies { val AkkaVersion = "2.4.20" val AkkaHttpVersion = "10.0.10" - val junitVersion = "4.12" + val JUnitVersion = "4.12" + val SprayJsonVersion = "1.3.3" + val akkaDns = "2.4.2" val Common = Seq( libraryDependencies ++= Seq( @@ -18,11 +20,16 @@ object Dependencies { "com.typesafe.akka" %% "akka-cluster-sharding" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, - "io.spray" %% "spray-json" % "1.3.3", // ApacheV2 + "io.spray" %% "spray-json" % SprayJsonVersion, // ApacheV2 + + // TODO this would be needed to use the SRV records, but more likely we want to implement it ourselves + // "ru.smslv.akka" %% "akka-dns" % akkaDns, // ApacheV2 + "com.typesafe.akka" %% "akka-distributed-data-experimental" % AkkaVersion % "test", "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % "test", - "junit" % "junit" % junitVersion % "test", + "junit" % "junit" % JUnitVersion % "test", "org.mockito" % "mockito-all" % "1.10.19" % "test" // Common Public License 1.0 ) ) + } diff --git a/project/plugins.sbt b/project/plugins.sbt index 0e5a0e222..5aa0990ec 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,5 @@ +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.1") + addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.6.0") addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.4.10") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "1.1.1")