Skip to content

Commit

Permalink
Throttle http requests when sinks cannot flush events (close #190)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 6, 2021
1 parent 8079b55 commit 631b8b5
Show file tree
Hide file tree
Showing 28 changed files with 342 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class CollectorService(
spAnonymous
)
// we don't store events in case we're bouncing
if (!bounce && !doNotTrack) sinkEvent(event, partitionKey)
val sinkOk = if (!bounce && !doNotTrack) sinkEvent(event, partitionKey) else true

val headers = bounceLocationHeader(params, request, config.cookieBounce, bounce) ++
cookieHeader(request, config.cookieConfig, nuid, doNotTrack, spAnonymous) ++
Expand All @@ -155,7 +155,10 @@ class CollectorService(
`Access-Control-Allow-Credentials`(true)
)

buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)
if (sinkOk || redirect)
buildHttpResponse(event, params, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)
else
HttpResponse(StatusCodes.ServiceUnavailable)

case Left(error) =>
val badRow = BadRow.GenericError(
Expand All @@ -165,7 +168,7 @@ class CollectorService(
)

if (sinks.bad.isHealthy) {
sinkBad(badRow, partitionKey)
sinkBad(badRow, partitionKey) // ignore the result, which tells us if bad event was sunk successfully.
HttpResponse(StatusCodes.OK)
} else HttpResponse(StatusCodes.OK) // if bad sink is unhealthy, we don't want to try storing the bad rows
}
Expand Down Expand Up @@ -256,20 +259,24 @@ class CollectorService(
e
}

/** Produces the event to the configured sink. */
/** Produces the event to the configured sink.
* @return whether the events were stored successfully
*/
def sinkEvent(
event: CollectorPayload,
partitionKey: String
): Unit = {
): Boolean = {
// Split events into Good and Bad
val eventSplit = splitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Send events to respective sinks
sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) &&
sinks.good.storeRawEvents(eventSplit.good, partitionKey)
sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
}

/** Sinks a bad row generated by an illegal querystring. */
def sinkBad(badRow: BadRow, partitionKey: String): Unit = {
/** Sinks a bad row generated by an illegal querystring.
* @return whether the events were stored successfully
*/
def sinkBad(badRow: BadRow, partitionKey: String): Boolean = {
val toSink = List(badRow.compact.getBytes(UTF_8))
sinks.bad.storeRawEvents(toSink, partitionKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ package model {
) extends SinkConfig
final case class Nsq(host: String, port: Int) extends SinkConfig
case object Stdout extends SinkConfig
final case class BufferConfig(byteLimit: Long, recordLimit: Long, timeLimit: Long)
final case class BufferConfig(
byteLimit: Long,
recordLimit: Long,
timeLimit: Long,
hardByteLimit: Option[Long],
enqueueTimeout: Long
)
final case class StreamsConfig(
good: String,
bad: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.util.concurrent.{Semaphore, TimeUnit}

import cats.Monad
import cats.implicits._

import org.slf4j.LoggerFactory

import com.snowplowanalytics.snowplow.collectors.scalastream.model.{BufferConfig, CollectorSinks}

// Define an interface for all sinks to use to store events.
trait Sink {

Expand All @@ -30,5 +37,52 @@ trait Sink {
lazy val log = LoggerFactory.getLogger(getClass())

def isHealthy: Boolean = true
def storeRawEvents(events: List[Array[Byte]], key: String): Unit

/** Store the raw events in the output sink
* @param events The events to store
* @return whether the events were stored successfully
*/
def storeRawEvents(events: List[Array[Byte]], key: String): Boolean
}

object Sink {
abstract class Throttled(throttler: Throttler) extends Sink {

def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit

protected def onComplete(sunkBytes: Long): Unit =
throttler.release(sunkBytes)

override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = {
val bytes = events.foldLeft(0L)(_ + _.size.toLong)
if (throttler.tryAcquire(bytes)) {
storeRawEventsThrottled(events, key)
true
} else
false
}
}

case class Throttler(tryAcquire: Long => Boolean, release: Long => Unit)

def throttled[F[_]: Monad](
config: BufferConfig,
buildGood: Throttler => F[Sink],
buildBad: Throttler => F[Sink]
): F[CollectorSinks] = {
val semaphore = new Semaphore(bytesToPermits(config.hardByteLimit.getOrElse(Runtime.getRuntime.maxMemory / 4)))
val throttler = Throttler(
b => semaphore.tryAcquire(bytesToPermits(b), config.enqueueTimeout, TimeUnit.MILLISECONDS),
b => semaphore.release(bytesToPermits(b))
)
for {
good <- buildGood(throttler)
bad <- buildBad(throttler)
} yield CollectorSinks(good, bad)
}

// 1 permit corresponds to 64 bytes.
// Int.MaxValue permits corresponds to 127 GB, so we can accommodate any reasonable heap using a Semaphore.
private def bytesToPermits(bytes: Long): Int =
(bytes >> 6).toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class TestSink extends Sink {
// Effectively no limit to the record size
override val MaxBytes = Int.MaxValue

override def storeRawEvents(events: List[Array[Byte]], key: String): Unit =
override def storeRawEvents(events: List[Array[Byte]], key: String): Boolean = {
buf ++= events
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object TestUtils {
sqsGoodBuffer = Some("good-buffer"),
sqsBadBuffer = Some("bad-buffer")
),
buffer = BufferConfig(4000000L, 500L, 60000L)
buffer = BufferConfig(4000000L, 500L, 60000L, Some(8000000L), 10000L)
),
telemetry = None,
prometheusMetrics = PrometheusMetricsConfig(false, None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,19 @@ abstract class ConfigSpec extends Specification {
buffer =
if (app == "pubsub")
BufferConfig(
byteLimit = 100000,
recordLimit = 40,
timeLimit = 1000
byteLimit = 100000,
recordLimit = 40,
timeLimit = 1000,
hardByteLimit = None,
enqueueTimeout = 10000
)
else
BufferConfig(
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000
byteLimit = 3145728,
recordLimit = 500,
timeLimit = 5000,
hardByteLimit = None,
enqueueTimeout = 10000
),
sink = sinkConfigRefFactory(app)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream.sinks

import cats.Id

import com.snowplowanalytics.snowplow.collectors.scalastream.model.BufferConfig

import scala.collection.mutable.ListBuffer

import org.specs2.mutable.Specification

class SinkSpec extends Specification {

/** A sink that immediately calls the onComplete callback */
class HealthySink(buf: ListBuffer[Array[Byte]], throttler: Sink.Throttler) extends Sink.Throttled(throttler) {
val MaxBytes = Int.MaxValue
override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = {
buf ++= events
onComplete(events.foldLeft(0L)(_ + _.size))
}
}

/** A sink that buffers, but never calls the onComplete callback */
class UnhealthySink(buf: ListBuffer[Array[Byte]], throttler: Sink.Throttler) extends Sink.Throttled(throttler) {
val MaxBytes = Int.MaxValue
override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit =
buf ++= events
}

// A 64 byte event.
val testEvent = (1 to 64).map(_ => 'a'.toByte).toArray

// A config that allows 2 * 64 testEvents
val config = BufferConfig(
byteLimit = 1000,
recordLimit = 1000,
timeLimit = 1000,
hardByteLimit = Some(128),
enqueueTimeout = 2
)

"The throttled sink" should {
"immediately sink events to a healthy sink" in {
val buf = ListBuffer[Array[Byte]]()
val sink = Sink.throttled[Id](config, new HealthySink(buf, _), new HealthySink(buf, _))

val results = (1 to 10).toList.map { _ =>
sink.good.storeRawEvents(List(testEvent), "key")
}

results must contain(beTrue).foreach

buf must have size 10
}

"something else" in {
val buf = ListBuffer[Array[Byte]]()
val sink = Sink.throttled[Id](config, new UnhealthySink(buf, _), new UnhealthySink(buf, _))

val results = (1 to 4).toList.map { _ =>
sink.good.storeRawEvents(List(testEvent), "key")
}

results must containTheSameElementsAs(Seq(true, true, false, false))

buf must have size 2
}
}
}
13 changes: 11 additions & 2 deletions examples/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ collector {

}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -214,6 +213,16 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000

# The limit on bytes held in memory before the collector throttles new http requests.
# This becomes relevant only if the collector is receiving events more quickly than the buffer
# can be emptied to the sink.
# The default (null) means use 1/4 of the maximum heap size.
hardByteLimit = null

# When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer.
# If they are not enqueued within the timeout in milliseconds, then the collector returns a 503.
enqueueTimeout = 10000
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
13 changes: 11 additions & 2 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ collector {
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to Kinesis
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -232,6 +231,16 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000

# The limit on bytes held in memory before the collector throttles new http requests.
# This becomes relevant only if the collector is receiving events more quickly than the buffer
# can be emptied to the sink.
# The default (null) means use 1/4 of the maximum heap size.
hardByteLimit = null

# When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer.
# If they are not enqueued within the timeout in milliseconds, then the collector returns a 503.
enqueueTimeout = 10000
}
}

Expand Down
13 changes: 11 additions & 2 deletions examples/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ collector {
}


# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to PubSub
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -209,6 +208,16 @@ collector {
byteLimit = 100000
recordLimit = 40
timeLimit = 1000

# The limit on bytes held in memory before the collector throttles new http requests.
# This becomes relevant only if the collector is receiving events more quickly than the buffer
# can be emptied to the sink.
# The default (null) means use 1/4 of the maximum heap size.
hardByteLimit = null

# When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer.
# If they are not enqueued within the timeout in milliseconds, then the collector returns a 503.
enqueueTimeout = 10000
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
13 changes: 11 additions & 2 deletions examples/config.sqs.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ collector {
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# Incoming events are stored in a buffer before being sent to SQS
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
Expand All @@ -222,6 +221,16 @@ collector {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000

# The limit on bytes held in memory before the collector throttles new http requests.
# This becomes relevant only if the collector is receiving events more quickly than the buffer
# can be emptied to the sink.
# The default (null) means use 1/4 of the maximum heap size.
hardByteLimit = null

# When the hardByteLimit is reached, the collector waits to enqueue incoming events to the buffer.
# If they are not enqueued within the timeout in milliseconds, then the collector returns a 503.
enqueueTimeout = 10000
}
}
# Telemetry sends heartbeat events to external pipeline.
Expand Down
12 changes: 0 additions & 12 deletions examples/config.stdout.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,6 @@ collector {
enabled = stdout
enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
# Telemetry sends heartbeat events to external pipeline.
# Unless disable parameter set to true, this feature will be enabled. Deleting whole section will not disable it.
Expand Down
Loading

0 comments on commit 631b8b5

Please sign in to comment.