Skip to content

Commit

Permalink
Adds jaeger as a tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter committed Nov 5, 2020
1 parent 1d3a560 commit fcdb2c8
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 72 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.prometheusCommon,
Dependencies.Libraries.opentracingApi,
Dependencies.Libraries.opentracingNoop,
Dependencies.Libraries.jaeger,
// Scala
Dependencies.Libraries.scopt,
Dependencies.Libraries.scalaz7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import io.jaegertracing.{Configuration => JaegerConfiguration}
import io.opentracing.Tracer
import io.opentracing.noop.NoopTracerFactory
import org.slf4j.LoggerFactory
import pureconfig._
Expand All @@ -40,7 +42,8 @@ trait Collector {
lazy val log = LoggerFactory.getLogger(getClass())

implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))
implicit val _ = new FieldCoproductHint[SinkConfig]("enabled")
implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled")
implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled")

def parseConfig(args: Array[String]): (CollectorConfig, Config) = {
case class FileConfig(config: File = new File("."))
Expand Down Expand Up @@ -69,13 +72,23 @@ trait Collector {
(loadConfigOrThrow[CollectorConfig](conf.getConfig("collector")), conf)
}

def tracer(config: TracerConfig): Tracer =
config match {
case TracerConfig.Noop =>
log.debug("Using noop tracer")
NoopTracerFactory.create
case TracerConfig.Jaeger =>
log.debug("Using jaeger tracer")
JaegerConfiguration.fromEnv.getTracer
}

def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = {

implicit val system = ActorSystem.create("scala-stream-collector", akkaConf)
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val sharedTracer = NoopTracerFactory.create
val sharedTracer = tracer(collectorConf.tracer)

val collectorRoute = new CollectorRoute {
override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpCookiePair
import akka.http.scaladsl.server.{Directive1, Route, StandardRoute}
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.Directives._

import io.opentracing.{Span, Tracer}

import model.DntCookieMatcher
import monitoring.BeanRegistry

import scala.concurrent.ExecutionContext

trait CollectorRoute {
def collectorService: Service
def tracer: Tracer
Expand All @@ -47,80 +45,98 @@ trait CollectorRoute {
complete(StatusCodes.NotFound -> "redirects disabled")
}

def completeWithSpan(r: HttpResponse, span: Span): StandardRoute =
def traceRoute(inner: Span => Route): Route =
requestContext => {
val fut = complete(r)(requestContext)
val span = tracer.buildSpan("HandleRequest").start
val fut = inner(span)(requestContext)
fut.onComplete { _ =>
span.finish
}(ExecutionContext.global)
}(requestContext.executionContext)
fut
}

// Activates the span only for the local thread
def withActiveSpan[T](span: Span)(f: => T): T = {
val scope = tracer.activateSpan(span)
try {
f
} finally {
scope.close
}
}

def routes: Route =
doNotTrack(collectorService.doNotTrackCookie) { dnt =>
val span = tracer.buildSpan("CollectorRequest").start
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
headers { (userAgent, refererURI, rawRequestURI) =>
val qs = queryString(rawRequestURI)
extractors { (host, ip, request) =>
// get the adapter vendor and version from the path
path(Segment / Segment) { (vendor, version) =>
val path = collectorService.determinePath(vendor, version)
post {
extractContentType { ct =>
entity(as[String]) { body =>
traceRoute { span =>
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
headers { (userAgent, refererURI, rawRequestURI) =>
val qs = queryString(rawRequestURI)
extractors { (host, ip, request) =>
// get the adapter vendor and version from the path
path(Segment / Segment) { (vendor, version) =>
val path = collectorService.determinePath(vendor, version)
post {
extractContentType { ct =>
entity(as[String]) { body =>
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
Some(body),
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = false,
doNotTrack = dnt,
Some(ct))
incrementRequests(r.status)
complete(r)
}
}
}
} ~
(get | head) {
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
Some(body),
None,
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = false,
doNotTrack = dnt,
Some(ct))
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
completeWithSpan(r, span)
complete(r)
}
}
} ~
(get | head) {
val (r, _) = collectorService.cookie(
qs,
None,
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
completeWithSpan(r, span)
}
} ~
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
val (r, _) = collectorService.cookie(
qs,
None,
"/" + path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
completeWithSpan(r, span)
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
None,
"/" + path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,16 @@ class CollectorService(
): List[Array[Byte]] = {
// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
val span = tracer.buildSpan("SinkRawEvents").start
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
span.finish
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
val span = tracer.buildSpan("SinkRawEvents").start()
try {
// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
} finally {
span.finish
}
}

/** Builds the final http response from */
Expand Down Expand Up @@ -351,12 +354,16 @@ class CollectorService(
case other => Some(other.toString)
}

def tracerHeaders: Iterable[String] = {
val m = scala.collection.mutable.Map.empty[String, String]
val adapter = new TextMapAdapter(m.asJava)
tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter)
m.map { case (k, v) => s"$k: $v" }
}
def tracerHeaders: Iterable[String] =
Option(tracer.activeSpan) match {
case Some(span) =>
val m = scala.collection.mutable.Map.empty[String, String]
val adapter = new TextMapAdapter(m.asJava)
tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter)
m.map { case (k, v) => s"$k: $v" }
case None =>
Iterable()
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ package model {
redirect: Boolean = false,
port: Int = 443
)

sealed trait TracerConfig
object TracerConfig {
case object Noop extends TracerConfig
case object Jaeger extends TracerConfig
}

final case class CollectorConfig(
interface: String,
port: Int,
Expand All @@ -157,7 +164,8 @@ package model {
streams: StreamsConfig,
prometheusMetrics: PrometheusMetricsConfig,
enableDefaultRedirect: Boolean = false,
ssl: SSLConfig = SSLConfig()
ssl: SSLConfig = SSLConfig(),
tracer: TracerConfig = TracerConfig.Noop
) {
val cookieConfig = if (cookie.enabled) Some(cookie) else None
val doNotTrackHttpCookie =
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object Dependencies {
val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries
val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries
val opentracing = "0.33.0"
val jaeger = "1.4.0"
// Scala
val collectorPayload = "0.0.0"
val scalaz7 = "7.0.9"
Expand Down Expand Up @@ -74,6 +75,7 @@ object Dependencies {
val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor
val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing
val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing
val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger
val retry = "com.softwaremill.retry" %% "retry" % V.retry

// Scala
Expand Down

0 comments on commit fcdb2c8

Please sign in to comment.