Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce CPU & network consumption of Facia JSON download #26338

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ val common = library("common")
awsSts,
awsSqs,
awsSsm,
eTagCachingS3,
contentApiClient,
enumeratumPlayJson,
filters,
Expand Down Expand Up @@ -70,8 +71,6 @@ val common = library("common")
capiAws,
okhttp,
) ++ jackson,
)
.settings(
TestAssets / mappings ~= filterAssets,
)

Expand Down
1 change: 1 addition & 0 deletions common/app/common/metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object FaciaPressMetrics {
val FrontPressContentSizeLite = SamplerMetric("front-press-content-size-lite", StandardUnit.Bytes)
val FrontDecodingLatency = DurationMetric("front-decoding-latency", StandardUnit.Milliseconds)
val FrontDownloadLatency = DurationMetric("front-download-latency", StandardUnit.Milliseconds)
val FrontNotModifiedDownloadLatency = DurationMetric("front-not-modified-download-latency", StandardUnit.Milliseconds)
}

object EmailSubsciptionMetrics {
Expand Down
23 changes: 0 additions & 23 deletions common/app/concurrent/FutureSemaphore.scala

This file was deleted.

7 changes: 5 additions & 2 deletions common/app/metrics/FrontendMetrics.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package metrics

import java.util.concurrent.atomic.AtomicLong

import com.amazonaws.services.cloudwatch.model.StandardUnit
import common.{Box, StopWatch}
import model.diagnostics.CloudWatch
import org.joda.time.DateTime

import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.Future
import scala.util.Try

Expand Down Expand Up @@ -130,6 +131,8 @@ final case class DurationMetric(override val name: String, override val metricUn
def record(dataPoint: DurationDataPoint): Unit = dataPoints.alter(dataPoint :: _)

def recordDuration(timeInMillis: Double): Unit = record(DurationDataPoint(timeInMillis, Option(DateTime.now)))
def recordDuration(duration: java.time.Duration): Unit =
recordDuration(duration.toNanos.toDouble / MILLISECONDS.toNanos(1))

override def isEmpty: Boolean = dataPoints.get().isEmpty
}
Expand Down
32 changes: 16 additions & 16 deletions common/app/services/S3.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package services

import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead}
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.util.StringInputStream
import com.gu.etagcaching.aws.s3.ObjectId
import common.GuLogging
import conf.Configuration
import model.PressedPageType
import org.joda.time.DateTime
import services.S3.logS3ExceptionWithDevHint

import java.io._
import java.util.zip.GZIPOutputStream
import scala.io.{Codec, Source}

trait S3 extends GuLogging {
Expand All @@ -27,8 +28,8 @@ trait S3 extends GuLogging {

private def withS3Result[T](key: String)(action: S3Object => T): Option[T] =
client.flatMap { client =>
val objectId = ObjectId(bucket, key)
try {

val request = new GetObjectRequest(bucket, key)
val result = client.getObject(request)
log.info(s"S3 got ${result.getObjectMetadata.getContentLength} bytes from ${result.getKey}")
Expand All @@ -44,13 +45,10 @@ trait S3 extends GuLogging {
}
} catch {
case e: AmazonS3Exception if e.getStatusCode == 404 =>
log.warn("not found at %s - %s" format (bucket, key))
log.warn(s"not found at ${objectId.s3Uri}")
None
case e: AmazonS3Exception =>
val errorMsg = s"Unable to fetch S3 object (key: $key)"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
logS3ExceptionWithDevHint(objectId, e)
None
case e: Exception =>
throw e
Expand Down Expand Up @@ -91,11 +89,6 @@ trait S3 extends GuLogging {
putGzipped(key, value, contentType, Private)
}

def getGzipped(key: String)(implicit codec: Codec): Option[String] =
withS3Result(key) { result =>
Source.fromInputStream(new GZIPInputStream(result.getObjectContent)).mkString
}

private def putGzipped(
key: String,
value: String,
Expand Down Expand Up @@ -148,7 +141,14 @@ trait S3 extends GuLogging {
}
}

object S3 extends S3
object S3 extends S3 {
def logS3ExceptionWithDevHint(s3ObjectId: ObjectId, e: Exception): Unit = {
val errorMsg = s"Unable to fetch S3 object (${s3ObjectId.s3Uri})"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
}
}

object S3FrontsApi extends S3 {

Expand Down
89 changes: 43 additions & 46 deletions common/app/services/fronts/FrontJsonFapi.scala
Original file line number Diff line number Diff line change
@@ -1,70 +1,67 @@
package services.fronts

import common.{FaciaPressMetrics, GuLogging}
import concurrent.{BlockingOperations, FutureSemaphore}
import com.gu.etagcaching.ETagCache
import com.gu.etagcaching.FreshnessPolicy.AlwaysWaitForRefreshedValue
import com.gu.etagcaching.aws.s3.ObjectId
import com.gu.etagcaching.aws.sdkv2.s3.S3ObjectFetching
import com.gu.etagcaching.aws.sdkv2.s3.response.Transformer.Bytes
import common.FaciaPressMetrics.{FrontDecodingLatency, FrontDownloadLatency, FrontNotModifiedDownloadLatency}
import common.GuLogging
import conf.Configuration
import metrics.DurationMetric
import metrics.DurationMetric.withMetrics
import model.{PressedPage, PressedPageType}
import play.api.libs.json.Json
import services.S3
import services.S3.logS3ExceptionWithDevHint
import services._
import software.amazon.awssdk.services.s3.model.S3Exception
import utils.AWSv2.S3Async

import java.util.zip.GZIPInputStream
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Using

trait FrontJsonFapi extends GuLogging {
implicit val executionContext: ExecutionContext
lazy val stage: String = Configuration.facia.stage.toUpperCase
val bucketLocation: String
val parallelJsonPresses = 32
val futureSemaphore = new FutureSemaphore(parallelJsonPresses)

def blockingOperations: BlockingOperations
private def s3ObjectIdFor(path: String, prefix: String): ObjectId =
ObjectId(
S3.bucket,
s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json",
)

private def getAddressForPath(path: String, prefix: String): String =
s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json"

def get(path: String, pageType: PressedPageType)(implicit
executionContext: ExecutionContext,
): Future[Option[PressedPage]] =
errorLoggingF(s"FrontJsonFapi.get $path") {
pressedPageFromS3(getAddressForPath(path, pageType.suffix))
}

private def parsePressedPage(
jsonStringOpt: Option[String],
)(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] =
futureSemaphore.execute {
blockingOperations.executeBlocking {
jsonStringOpt.map { jsonString =>
DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) {
// This operation is run in the thread pool since it is very CPU intensive
Json.parse(jsonString).as[PressedPage]
}
private val pressedPageCache: ETagCache[ObjectId, PressedPage] = new ETagCache(
S3ObjectFetching(S3Async, Bytes)
.timing(
successWith = FrontDownloadLatency.recordDuration,
notModifiedWith = FrontNotModifiedDownloadLatency.recordDuration,
)
.thenParsing { bytes =>
withMetrics(FrontDecodingLatency) {
Using(new GZIPInputStream(bytes.asInputStream()))(Json.parse(_).as[PressedPage]).get
}
}
}
},
AlwaysWaitForRefreshedValue,
_.maximumSize(180).expireAfterAccess(1.hour),
)

private def loadPressedPageFromS3(path: String) =
blockingOperations.executeBlocking {
DurationMetric.withMetrics(FaciaPressMetrics.FrontDownloadLatency) {
S3.getGzipped(path)
def get(path: String, pageType: PressedPageType): Future[Option[PressedPage]] =
errorLoggingF(s"FrontJsonFapi.get $path") {
val objectId = s3ObjectIdFor(path, pageType.suffix)
pressedPageCache.get(objectId).map(Some(_)).recover {
case s3Exception: S3Exception =>
logS3ExceptionWithDevHint(objectId, s3Exception)
None
}
}

private def pressedPageFromS3(
path: String,
)(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] =
errorLoggingF(s"FrontJsonFapi.pressedPageFromS3 $path") {
for {
s3FrontData <- loadPressedPageFromS3(path)
pressedPage <- parsePressedPage(s3FrontData)
} yield pressedPage
}

}

class FrontJsonFapiLive(val blockingOperations: BlockingOperations) extends FrontJsonFapi {
Copy link
Contributor

@ioannakok ioannakok Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're deleting BlockingOperations this config can also be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that bit of config is still used by the BlockingOperations class, and that hasn't been deleted (unfortunately?!) - we're no longer using it for the Fronts-JSON download code, but it's still used elsewhere, eg dfp.OrderAgent.

class FrontJsonFapiLive(implicit val executionContext: ExecutionContext) extends FrontJsonFapi {
override val bucketLocation: String = s"$stage/frontsapi/pressed/live"
}

class FrontJsonFapiDraft(val blockingOperations: BlockingOperations) extends FrontJsonFapi {
class FrontJsonFapiDraft(implicit val executionContext: ExecutionContext) extends FrontJsonFapi {
override val bucketLocation: String = s"$stage/frontsapi/pressed/draft"
}
25 changes: 25 additions & 0 deletions common/app/utils/AWSv2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package utils

import software.amazon.awssdk.auth.credentials._
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.Region.EU_WEST_1
import software.amazon.awssdk.services.s3.{S3AsyncClient, S3AsyncClientBuilder}

object AWSv2 {
val region: Region = EU_WEST_1

def credentialsForDevAndProd(devProfile: String, prodCreds: AwsCredentialsProvider): AwsCredentialsProviderChain =
AwsCredentialsProviderChain.of(prodCreds, ProfileCredentialsProvider.builder().profileName(devProfile).build())

lazy val credentials: AwsCredentialsProvider =
credentialsForDevAndProd("frontend", InstanceProfileCredentialsProvider.create())

def build[T, B <: AwsClientBuilder[B, T]](builder: B): T =
builder.credentialsProvider(credentials).region(region).build()

val S3Async: S3AsyncClient = build[S3AsyncClient, S3AsyncClientBuilder](
S3AsyncClient.builder().httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(250)),
)
}
11 changes: 3 additions & 8 deletions common/test/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,9 @@ trait WithTestCSRF {

trait WithTestFrontJsonFapi {
// need a front api that stores S3 locally so it can run without deps in the unit tests
class TestFrontJsonFapi(override val blockingOperations: BlockingOperations)
extends FrontJsonFapiLive(blockingOperations) {
class TestFrontJsonFapi extends FrontJsonFapiLive()(ExecutionContext.global) {

override def get(path: String, pageType: PressedPageType)(implicit
executionContext: ExecutionContext,
): Future[Option[PressedPage]] = {
override def get(path: String, pageType: PressedPageType): Future[Option[PressedPage]] = {
recorder.load(path, Map()) {
super.get(path, pageType)
}
Expand All @@ -197,7 +194,5 @@ trait WithTestFrontJsonFapi {
}
}

lazy val actorSystem = ActorSystem()
lazy val blockingOperations = new BlockingOperations(actorSystem)
lazy val fapi = new TestFrontJsonFapi(blockingOperations)
lazy val fapi = new TestFrontJsonFapi()
}
4 changes: 4 additions & 0 deletions facia/app/AppLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import services._
import services.fronts.{FrontJsonFapiDraft, FrontJsonFapiLive}
import router.Routes

import scala.concurrent.ExecutionContext

class AppLoader extends FrontendApplicationLoader {
override def buildComponents(context: Context): FrontendComponents =
new BuiltInComponentsFromContext(context) with AppComponents
}

trait FapiServices {
implicit val executionContext: ExecutionContext
def wsClient: WSClient
def actorSystem: ActorSystem
lazy val frontJsonFapiLive = wire[FrontJsonFapiLive]
Expand Down Expand Up @@ -68,6 +71,7 @@ trait AppComponents extends FrontendComponents with FaciaControllers with FapiSe
override lazy val appMetrics = ApplicationMetrics(
FaciaPressMetrics.FrontDecodingLatency,
FaciaPressMetrics.FrontDownloadLatency,
FaciaPressMetrics.FrontNotModifiedDownloadLatency,
DCRMetrics.DCRLatencyMetric,
DCRMetrics.DCRRequestCountMetric,
)
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object Dependencies {
val awsEc2 = "com.amazonaws" % "aws-java-sdk-ec2" % awsVersion
val awsKinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion
val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsVersion
val eTagCachingS3 = "com.gu.etag-caching" %% "aws-s3-sdk-v2" % "1.0.4"
val awsSes = "com.amazonaws" % "aws-java-sdk-ses" % awsVersion
val awsSns = "com.amazonaws" % "aws-java-sdk-sns" % awsVersion
val awsSts = "com.amazonaws" % "aws-java-sdk-sts" % awsVersion
Expand Down
Loading