Skip to content

Commit

Permalink
Throttle http requests when sinks cannot flush events (close #190)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 6, 2021
1 parent 8079b55 commit 91b8ee7
Show file tree
Hide file tree
Showing 27 changed files with 228 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class CollectorService(
spAnonymous
)
// we don't store events in case we're bouncing
if (!bounce && !doNotTrack) sinkEvent(event, partitionKey)
val sinkOk = if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) else true

val headers = bounceLocationHeader(params, request, config.cookieBounce, bounce) ++
cookieHeader(request, config.cookieConfig, nuid, doNotTrack, spAnonymous) ++
Expand All @@ -155,7 +155,10 @@ class CollectorService(
`Access-Control-Allow-Credentials`(true)
)

buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)
if (sinkOk)
buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)
else
HttpResponse(StatusCodes.ServiceUnavailable)

case Left(error) =>
val badRow = BadRow.GenericError(
Expand All @@ -165,7 +168,7 @@ class CollectorService(
)

if (sinks.bad.isHealthy) {
sinkBad(badRow, partitionKey)
sinkBad(badRow, partitionKey) // ignore the result, which tells us if bad event was sunk successfully.
HttpResponse(StatusCodes.OK)
} else HttpResponse(StatusCodes.OK) // if bad sink is unhealthy, we don't want to try storing the bad rows
}
Expand Down Expand Up @@ -256,20 +259,24 @@ class CollectorService(
e
}

/** Produces the event to the configured sink. */
/** Produces the event to the configured sink.
* @return whether the events were stored successfully
*/
def sinkEvent(
event: CollectorPayload,
partitionKey: String
): Unit = {
): Boolean = {
// Split events into Good and Bad
val eventSplit = splitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) &&
sinks.good.storeRawEvents(eventSplit.good, partitionKey)
sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
}

/** Sinks a bad row generated by an illegal querystring. */
def sinkBad(badRow: BadRow, partitionKey: String): Unit = {
/** Sinks a bad row generated by an illegal querystring.
* @return whether the events were stored successfully
*/
def sinkBad(badRow: BadRow, partitionKey: String): Boolean = {
val toSink = List(badRow.compact.getBytes(UTF_8))
sinks.bad.storeRawEvents(toSink, partitionKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ package model {
) extends SinkConfig
final case class Nsq(host: String, port: Int) extends SinkConfig
case object Stdout extends SinkConfig
final case class BufferConfig(byteLimit: Long, recordLimit: Long, timeLimit: Long)
final case class BufferConfig(
byteLimit: Long,
recordLimit: Long,
timeLimit: Long,
hardByteLimit: Option[Long],
enqueueTimeout: FiniteDuration
)
final case class StreamsConfig(
good: String,
bad: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.util.concurrent.{Semaphore, TimeUnit}

import cats.Monad
import cats.implicits._

import org.slf4j.LoggerFactory

import com.snowplowanalytics.snowplow.collectors.scalastream.model.{BufferConfig, CollectorSinks}

// Define an interface for all sinks to use to store events.
trait Sink {

Expand All @@ -30,5 +37,52 @@ trait Sink {
lazy val log = LoggerFactory.getLogger(getClass())

def isHealthy: Boolean = true
def storeRawEvents(events: List[Array[Byte]], key: String): Unit

/** Store the raw events in the output sink
* @param events The events to store
* @return whether the events were stored successfully
*/
def storeRawEvents(events: List[Array[Byte]], key: String): Boolean
}

object Sink {
abstract class Throttled(throttler: Throttler) extends Sink {

def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit

protected def onComplete(sunkBytes: Long): Unit =
throttler.release(sunkBytes)

override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = {
val bytes = events.foldLeft(0L)(_ + _.size.toLong)
if (throttler.tryAcquire(bytes)) {
storeRawEventsThrottled(events, key)
true
} else
false
}
}

case class Throttler(tryAcquire: Long => Boolean, release: Long => Unit)

def throttled[F[_]: Monad](
config: BufferConfig,
buildGood: Throttler => F[Sink],
buildBad: Throttler => F[Sink]
): F[CollectorSinks] = {
val semaphore = new Semaphore(bytesToPermits(config.hardByteLimit.getOrElse(Runtime.getRuntime.maxMemory / 4)))
val throttler = Throttler(
b => semaphore.tryAcquire(bytesToPermits(b), config.enqueueTimeout.toMillis, TimeUnit.MILLISECONDS),
b => semaphore.release(bytesToPermits(b))
)
for {
good <- buildGood(throttler)
bad <- buildBad(throttler)
} yield CollectorSinks(good, bad)
}

// 1 permit corresponds to 64 bytes.
// Int.MaxValue permits corresponds to 127 GB, so we can accommodate any reasonable heap using a Semaphore.
private def bytesToPermits(bytes: Long): Int =
(bytes >> 6).toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class TestSink extends Sink {
// Effectively no limit to the record size
override val MaxBytes = Int.MaxValue

override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = {
buf ++= events
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object TestUtils {
sqsGoodBuffer = Some("good-buffer"),
sqsBadBuffer = Some("bad-buffer")
),
buffer = BufferConfig(4000000L, 500L, 60000L)
buffer = BufferConfig(4000000L, 500L, 60000L, Some(8000000L), 10.seconds)
),
telemetry = None,
prometheusMetrics = PrometheusMetricsConfig(false, None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,19 @@ abstract class ConfigSpec extends Specification {
buffer =
if (app == "pubsub")
BufferConfig(
byteLimit = 100000,
recordLimit = 40,
timeLimit = 1000
byteLimit = 100000,
recordLimit = 40,
timeLimit = 1000,
hardByteLimit = None,
enqueueTimeout = 10.seconds
)
else
BufferConfig(
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000,
hardByteLimit = None,
enqueueTimeout = 10.seconds
),
sink = sinkConfigRefFactory(app)
)
Expand Down
5 changes: 3 additions & 2 deletions examples/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ collector {

}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -214,6 +213,8 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
hardByteLimit = null
enqueueTimeout = 10 seconds
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
5 changes: 3 additions & 2 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ collector {
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to Kinesis
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -232,6 +231,8 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
hardByteLimit = null
enqueueTimeout = 10 seconds
}
}

Expand Down
5 changes: 3 additions & 2 deletions examples/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ collector {
}


# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to PubSub
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -209,6 +208,8 @@ collector {
byteLimit = 100000
recordLimit = 40
timeLimit = 1000
hardByteLimit = null
enqueueTimeout = 10 seconds
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
5 changes: 3 additions & 2 deletions examples/config.sqs.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ collector {
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to SQS
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -222,6 +221,8 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
hardByteLimit = null
enqueueTimeout = 10 seconds
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
12 changes: 0 additions & 12 deletions examples/config.stdout.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,6 @@ collector {
enabled = stdout
enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
# Telemetry sends heartbeat events to external pipeline.
# Unless disable parameter set to true, this feature will be enabled. Deleting whole section will not disable it.
Expand Down
1 change: 1 addition & 0 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
enqueueTimeout = 10 seconds
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.Id
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink
import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.{KafkaSink, Sink}
import com.snowplowanalytics.snowplow.collectors.scalastream.telemetry.TelemetryAkkaService
import com.snowplowanalytics.snowplow.collectors.scalastream.generated.BuildInfo

Expand All @@ -31,12 +32,15 @@ object KafkaCollector extends Collector {
val goodStream = collectorConf.streams.good
val badStream = collectorConf.streams.bad
val bufferConf = collectorConf.streams.buffer
val (good, bad) = collectorConf.streams.sink match {
collectorConf.streams.sink match {
case kc: Kafka =>
(new KafkaSink(kc, bufferConf, goodStream), new KafkaSink(kc, bufferConf, badStream))
Sink.throttled[Id](
collectorConf.streams.buffer,
new KafkaSink(kc, bufferConf, goodStream, _),
new KafkaSink(kc, bufferConf, badStream, _)
)
case _ => throw new IllegalArgumentException("Configured sink is not Kafka")
}
CollectorSinks(good, bad)
}
run(collectorConf, akkaConf, sinks, telemetry)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._
class KafkaSink(
kafkaConfig: Kafka,
bufferConfig: BufferConfig,
topicName: String
) extends Sink {
topicName: String,
throttler: Sink.Throttler
) extends Sink.Throttled(throttler) {

// Records must not exceed MaxBytes - 1MB
override val MaxBytes = 1000000
Expand Down Expand Up @@ -64,14 +65,15 @@ class KafkaSink(
* @param events The list of events to send
* @param key The partition key to use
*/
override def storeRawEvents(events: List[Array[Byte]], key: String): Unit = {
override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = {
log.debug(s"Writing ${events.size} Thrift records to Kafka topic $topicName at key $key")
events.foreach { event =>
kafkaProducer.send(
new ProducerRecord(topicName, key, event),
new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception): Unit =
if (e != null) log.error(s"Sending event failed: ${e.getMessage}")
onComplete(events.size.toLong)
}
)
}
Expand Down
1 change: 1 addition & 0 deletions kinesis/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
enqueueTimeout = 10 seconds
}
}
}
Expand Down
Loading

0 comments on commit 91b8ee7

Please sign in to comment.