Skip to content

Commit

Permalink
Use akka async-dns and default to SRV records (#214)
Browse files Browse the repository at this point in the history
* Use akka async-dns for dns service discovery
  • Loading branch information
chbatey authored Jul 18, 2018
1 parent fc50c82 commit b73df70
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: required
language: scala

scala:
- "2.12.2"
- "2.12.6"
- "2.11.11"

jdk:
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ lazy val docs = project
.disablePlugins(BintrayPlugin)
.settings(
name := "Akka Management",
paradoxGroups := Map("Language" -> Seq("Scala", "Java")),
paradoxTheme := Some(builtinParadoxTheme("generic")),
paradox in Compile := (paradox in Compile).dependsOn(unidocTask).value,
paradoxProperties ++= Map(
Expand All @@ -267,6 +268,8 @@ lazy val docs = project
if (isSnapshot.value) Paths.get((target in paradox in Compile).value.getPath).relativize(Paths.get(unidocTask.value.head.getPath)).toString
else s"http://developer.lightbend.com/docs/api/akka-management/${version.value}"
},
"snip.code.base_dir" -> (sourceDirectory in Test).value.getAbsolutePath,
"snip.management.base_dir" -> (baseDirectory in ThisBuild).value.getAbsolutePath,
"scaladoc.version" -> "2.12.0"
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
*/
package akka.management.cluster.bootstrap.contactpoint

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ object AggregateSimpleServiceDiscoverySpec {
aggregate {
discovery-mechanisms = ["stubbed1", "config"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ class Ec2TagBasedSimpleServiceDiscovery(system: ActorSystem) extends SimpleServi
after(resolveTimeout, using = system.scheduler)(
Future.failed(new TimeoutException(s"Lookup for [$query] timed-out, within [$resolveTimeout]!"))
),
lookup(query.serviceName)
lookup(query)
)
)

def lookup(name: String): Future[Resolved] = {
def lookup(query: Lookup): Future[Resolved] = {

val tagFilter = new Filter("tag:" + tagKey, List(name).asJava)
val tagFilter = new Filter("tag:" + tagKey, List(query.serviceName).asJava)

val allFilters: List[Filter] = runningInstancesFilter :: tagFilter :: otherFilters

Future {
getInstances(ec2Client, allFilters, None).map((ip: String) => ResolvedTarget(host = ip, port = None))
}.map(resoledTargets => Resolved(name, resoledTargets))
}.map(resoledTargets => Resolved(query.serviceName, resoledTargets))

}

Expand Down
16 changes: 9 additions & 7 deletions discovery-dns/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
# Akka Service Discovery DNS Config #
######################################################

akka.discovery {
akka {
discovery {

# Set the following in your application.conf if you want to use this discovery mechanism:
# impl = akka-dns
# Set the following in your application.conf if you want to use this discovery mechanism:
# impl = akka-dns

# configured the akka-dns provider
akka-dns {
# currently no special configuration is present for this module
class = akka.discovery.dns.DnsSimpleServiceDiscovery
# configured the akka-dns provider
akka-dns {
# currently no special configuration is present for this module
class = akka.discovery.dns.DnsSimpleServiceDiscovery
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package akka.discovery.dns

import akka.AkkaVersion
import akka.actor.ActorSystem
import akka.event.Logging
import akka.io.{ Dns, IO }
Expand All @@ -11,33 +12,60 @@ import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.discovery._
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
import akka.io.dns.DnsProtocol.{ Ip, Srv }

/**
* Looks for A records for a given service.
*/
class DnsSimpleServiceDiscovery(system: ActorSystem) extends SimpleServiceDiscovery {

// Required for async dns
AkkaVersion.require("discovery-dns", "2.5.14")
require(system.settings.config.getString("akka.io.dns.resolver") == "async-dns",
"Akka discovery DNS requires akka.io.dns.resolver to be set to async-dns")

import SimpleServiceDiscovery._

private val log = Logging(system, getClass)
private val dns = IO(Dns)(system)

import system.dispatcher

private def cleanIpString(ipString: String): String =
if (ipString.startsWith("/")) ipString.tail else ipString

override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
def cleanIpString(ipString: String): String =
if (ipString.startsWith("/")) ipString.tail else ipString

dns.ask(Dns.Resolve(lookup.serviceName))(resolveTimeout) map {
case resolved: Dns.Resolved =>
log.info("Resolved Dns.Resolved: {}", resolved)
val addresses = resolved.ipv4.map { entry
val address = cleanIpString(entry.getHostAddress)
ResolvedTarget(host = address, port = None)
lookup match {
case Lookup(name, Some(portName), Some(protocol)) =>
val srvRequest = s"_$portName._$protocol.$name"
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
case resolved: DnsProtocol.Resolved =>
log.debug("Resolved Dns.Resolved: {}", resolved)
val addresses = resolved.results.collect {
case srv: SRVRecord => ResolvedTarget(srv.target, Some(srv.port))
}
Resolved(srvRequest, addresses)
case resolved
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
Resolved(srvRequest, Nil)
}
Resolved(lookup.serviceName, addresses)
case _ =>
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
case resolved: DnsProtocol.Resolved =>
log.debug("Resolved Dns.Resolved: {}", resolved)
val addresses = resolved.results.collect {
case a: ARecord => ResolvedTarget(cleanIpString(a.ip.getHostAddress), None)
case a: AAAARecord => ResolvedTarget(cleanIpString(a.ip.getHostAddress), None)
}
Resolved(lookup.serviceName, addresses)
case resolved
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
Resolved(lookup.serviceName, Nil)

case resolved
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
Resolved(lookup.serviceName, Nil)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,145 @@
*/
package akka.cluster.bootstrap.discovery

import org.scalatest.{ Matchers, WordSpecLike }
import akka.actor.ActorSystem
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.discovery.SimpleServiceDiscovery.ResolvedTarget
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }

class DnsDiscoverySpec extends WordSpecLike with Matchers {
import scala.concurrent.duration._

"Dns Discovery" should {
"work" in pending
/*
Testing is done via subbing out the DnsClient. To test against a real dns server
install bind and add this to /etc/named.conf
zone "akka.test" IN {
type master;
file "akka.test.zone";
};
zone "akka.test2" IN {
type master;
file "akka.test2.zone";
};
Then add the two zone files to /var/named/ akka.test.zone and akka.test2.zone
akka.test.zone:
=================
$TTL 86400
@ IN SOA akka.test root.akka.test (
2017010302
3600
900
604800
86400
)
@ IN NS test
test IN A 192.168.1.19
a-single IN A 192.168.1.20
a-double IN A 192.168.1.21
a-double IN A 192.168.1.22
aaaa-single IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:1
aaaa-double IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:2
aaaa-double IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:3
a-aaaa IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:4
a-aaaa IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:5
a-aaaa IN A 192.168.1.23
a-aaaa IN A 192.168.1.24
_service._tcp 86400 IN SRV 10 60 5060 a-single
_service._tcp 86400 IN SRV 10 40 5070 a-double
cname-in IN CNAME a-double
cname-ext IN CNAME a-single.akka.test2.
akka.test.zone:
=================
$TTL 86400
@ IN SOA akka.test2 root.akka.test2 (
2017010302
3600
900
604800
86400
)
@ IN NS test2
test2 IN A 192.168.2.19
a-single IN A 192.168.2.20
*/

object DnsDiscoverySpec {

val config = ConfigFactory.parseString("""
akka {
loglevel = DEBUG
discovery {
method = akka-dns
}
io {
dns {
resolver = "async-dns"
async-dns {
nameservers = ["localhost"]
}
}
}
}
""")

}

class DnsDiscoverySpec
extends TestKit(ActorSystem("DnsDiscoverySpec", DnsDiscoverySpec.config))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {

"Dns Discovery" must {

// Requires DNS server, see above
pending

"work with SRV records" in {
val discovery = ServiceDiscovery(system).discovery
val name = "_service._tcp.akka.test."
val result =
discovery
.lookup(Lookup("akka.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 500.milliseconds)
.futureValue
result.addresses.toSet shouldEqual Set(
ResolvedTarget("a-single.akka.test", Some(5060)),
ResolvedTarget("a-double.akka.test", Some(5070))
)
result.serviceName shouldEqual name
}

"work with IP records" in {
val discovery = ServiceDiscovery(system).discovery
val name = "a-single.akka.test"
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
result.serviceName shouldEqual name
result.addresses.toSet shouldEqual Set(
ResolvedTarget("192.168.1.20", None)
)
}
}

override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function => JFunction }

import akka.actor._
import akka.annotation.InternalApi

final class ServiceDiscovery(implicit system: ExtendedActorSystem) extends Extension {

Expand Down
Loading

0 comments on commit b73df70

Please sign in to comment.