Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds opentracing to the collector routes #77

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.config,
Dependencies.Libraries.prometheus,
Dependencies.Libraries.prometheusCommon,
Dependencies.Libraries.opentracingApi,
Dependencies.Libraries.opentracingNoop,
Dependencies.Libraries.jaeger,
Dependencies.Libraries.jaegerZipkin,
Dependencies.Libraries.zipkin,
Dependencies.Libraries.zipkinSender,
// Scala
Dependencies.Libraries.scopt,
Dependencies.Libraries.scalaz7,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ trait Collector {
lazy val log = LoggerFactory.getLogger(getClass())

implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase))
implicit val _ = new FieldCoproductHint[SinkConfig]("enabled")
implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled")
implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled")

def parseConfig(args: Array[String]): (CollectorConfig, Config) = {
case class FileConfig(config: File = new File("."))
Expand Down Expand Up @@ -73,8 +74,11 @@ trait Collector {
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val sharedTracer = Tracing.tracer(collectorConf.tracer)

val collectorRoute = new CollectorRoute {
override def collectorService = new CollectorService(collectorConf, sinks)
override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer)
override def tracer = sharedTracer
}

val prometheusMetricsService = new PrometheusMetricsService(collectorConf.prometheusMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package com.snowplowanalytics.snowplow.collectors.scalastream

import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpCookiePair
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.{Directive1, Route, RouteResult}
import akka.http.scaladsl.server.Directives._

import io.opentracing.{Span, Tracer}

import model.DntCookieMatcher
import monitoring.BeanRegistry
import scala.collection.JavaConverters._
import scala.util.{Success, Failure}

trait CollectorRoute {
def collectorService: Service
def tracer: Tracer

private val headers = optionalHeaderValueByName("User-Agent") &
optionalHeaderValueByName("Referer") &
Expand All @@ -42,70 +47,111 @@ trait CollectorRoute {
complete(StatusCodes.NotFound -> "redirects disabled")
}

def traceRoute(inner: Span => Route): Route =
requestContext => {
val span = tracer.buildSpan("handle-request").start
span.setTag("http.method", requestContext.request.method.name)
span.setTag("http.url", requestContext.request.uri.toString)

val fut = inner(span)(requestContext)
fut.onComplete { result =>
result match {
case Success(RouteResult.Complete(response)) =>
span.setTag("http.status", response.status.intValue)
case Success(RouteResult.Rejected(rejections)) =>
span.setTag("error", result.isFailure)
span.log(Map("event" -> "error", "error.object" -> rejections).asJava)
case Failure(e) =>
span.setTag("error", result.isFailure)
span.log(Map("event" -> "error", "error.object" -> e).asJava)
}
span.finish
}(requestContext.executionContext)
fut
}

// Activates the span only for the local thread
def withActiveSpan[T](span: Span)(f: => T): T = {
val scope = tracer.activateSpan(span)
try {
f
} finally {
scope.close
}
}

def routes: Route =
doNotTrack(collectorService.doNotTrackCookie) { dnt =>
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
headers { (userAgent, refererURI, rawRequestURI) =>
val qs = queryString(rawRequestURI)
extractors { (host, ip, request) =>
// get the adapter vendor and version from the path
path(Segment / Segment) { (vendor, version) =>
val path = collectorService.determinePath(vendor, version)
post {
extractContentType { ct =>
entity(as[String]) { body =>
traceRoute { span =>
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
headers { (userAgent, refererURI, rawRequestURI) =>
val qs = queryString(rawRequestURI)
extractors { (host, ip, request) =>
// get the adapter vendor and version from the path
path(Segment / Segment) { (vendor, version) =>
val path = collectorService.determinePath(vendor, version)
post {
extractContentType { ct =>
entity(as[String]) { body =>
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
Some(body),
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = false,
doNotTrack = dnt,
Some(ct))
incrementRequests(r.status)
complete(r)
}
}
}
} ~
(get | head) {
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
Some(body),
None,
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = false,
doNotTrack = dnt,
Some(ct))
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
}
}
} ~
(get | head) {
val (r, _) = collectorService.cookie(
qs,
None,
path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
}
} ~
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
val (r, _) = collectorService.cookie(
qs,
None,
"/" + path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
withActiveSpan(span) {
val (r, _) = collectorService.cookie(
qs,
None,
"/" + path,
cookie,
userAgent,
refererURI,
host,
ip,
request,
pixelExpected = true,
doNotTrack = dnt)
incrementRequests(r.status)
complete(r)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.headers.CacheDirectives._

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload
import io.opentracing.Tracer
import io.opentracing.propagation.{Format, TextMapAdapter}
import org.apache.commons.codec.binary.Base64
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -66,7 +68,8 @@ object CollectorService {

class CollectorService(
config: CollectorConfig,
sinks: CollectorSinks
sinks: CollectorSinks,
tracer: Tracer
) extends Service {

private val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -101,6 +104,7 @@ class CollectorService(
doNotTrack: Boolean,
contentType: Option[ContentType] = None
): (HttpResponse, List[Array[Byte]]) = {
Option(tracer.activeSpan).map(_.log(Map("message" -> "cookie handler").asJava))
val queryParams = Uri.Query(queryString).toMap

val (ipAddress, partitionKey) = ipAndPartitionKey(ip, config.streams.useIpAddressAsPartitionKey)
Expand Down Expand Up @@ -199,6 +203,7 @@ class CollectorService(
networkUserId: String,
contentType: Option[String]
): CollectorPayload = {

val e = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ipAddress,
Expand All @@ -213,7 +218,7 @@ class CollectorService(
refererUri.foreach(e.refererUri = _)
e.hostname = hostname
e.networkUserId = networkUserId
e.headers = (headers(request) ++ contentType).asJava
e.headers = (headers(request) ++ contentType ++ tracerHeaders).asJava
contentType.foreach(e.contentType = _)
e
}
Expand All @@ -225,11 +230,17 @@ class CollectorService(
): List[Array[Byte]] = {
// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
val span = tracer.buildSpan("sink-raw-events").start()
span.setTag("component", sinks.good.getClass.getSimpleName)
try {
// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
} finally {
span.finish
}
}

/** Builds the final http response from */
Expand Down Expand Up @@ -345,6 +356,17 @@ class CollectorService(
case other => Some(other.toString)
}

def tracerHeaders: Iterable[String] =
Option(tracer.activeSpan) match {
case Some(span) =>
val m = scala.collection.mutable.Map.empty[String, String]
val adapter = new TextMapAdapter(m.asJava)
tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter)
m.map { case (k, v) => s"$k: $v" }
case None =>
Iterable()
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] =
if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import io.jaegertracing.{Configuration => JaegerConfiguration}
import io.jaegertracing.internal.propagation.B3TextMapCodec
import io.jaegertracing.zipkin.ZipkinV2Reporter
import io.opentracing.Tracer
import io.opentracing.noop.NoopTracerFactory
import io.opentracing.propagation.Format
import org.slf4j.LoggerFactory
import zipkin2.reporter.okhttp3.OkHttpSender
import zipkin2.reporter.AsyncReporter

import scala.collection.JavaConverters._

import com.snowplowanalytics.snowplow.collectors.scalastream.model.TracerConfig


object Tracing {

lazy val log = LoggerFactory.getLogger(getClass())

def tracer(config: TracerConfig): Tracer =
config match {
case TracerConfig.Noop =>
log.debug("Using noop tracer")
NoopTracerFactory.create
case j: TracerConfig.Jaeger =>
log.debug("Using jaeger tracer")
new JaegerConfiguration(j.serviceName)
.withReporter {
val rc = new JaegerConfiguration.ReporterConfiguration
rc.withSender {
val sender = new JaegerConfiguration.SenderConfiguration
j.agentHost.foreach(sender.withAgentHost(_))
j.agentPort.foreach(sender.withAgentPort(_))
sender
}
rc
}
.withSampler {
val sampler = new JaegerConfiguration.SamplerConfiguration
j.samplerType.foreach(sampler.withType(_))
j.samplerParam.foreach(sampler.withParam(_))
j.managerHostPort.foreach(sampler.withManagerHostPort(_))
sampler
}
.withTracerTags(j.tracerTags.asJava)
.getTracer
case z: TracerConfig.Zipkin =>
log.debug("Using zipkin tracer")
val b3Codec = new B3TextMapCodec.Builder().build;

new JaegerConfiguration(z.serviceName)
.withSampler {
(new JaegerConfiguration.SamplerConfiguration)
.withType(z.samplerType)
.withParam(z.samplerParam)
}
.withTracerTags(z.tracerTags.asJava)
.getTracerBuilder
.withReporter {
new ZipkinV2Reporter(AsyncReporter.create(OkHttpSender.create(z.endpoint)))
}
.registerInjector(Format.Builtin.HTTP_HEADERS, b3Codec)
.registerExtractor(Format.Builtin.HTTP_HEADERS, b3Codec)
.build
}
}
Loading