diff --git a/build.sbt b/build.sbt index c2951916e06c..8d1a882f2c22 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,7 @@ val common = library("common") awsSts, awsSqs, awsSsm, + eTagCachingS3, contentApiClient, enumeratumPlayJson, filters, @@ -69,10 +70,7 @@ val common = library("common") identityModel, capiAws, okhttp, - "software.amazon.awssdk" % "s3" % "2.20.96", ) ++ jackson, - ) - .settings( TestAssets / mappings ~= filterAssets, ) diff --git a/common/app/common/metrics.scala b/common/app/common/metrics.scala index 2bf6dbfc9fd8..6e94833cf673 100644 --- a/common/app/common/metrics.scala +++ b/common/app/common/metrics.scala @@ -164,6 +164,7 @@ object FaciaPressMetrics { val FrontPressContentSizeLite = SamplerMetric("front-press-content-size-lite", StandardUnit.Bytes) val FrontDecodingLatency = DurationMetric("front-decoding-latency", StandardUnit.Milliseconds) val FrontDownloadLatency = DurationMetric("front-download-latency", StandardUnit.Milliseconds) + val FrontNotModifiedDownloadLatency = DurationMetric("front-not-modified-download-latency", StandardUnit.Milliseconds) } object EmailSubsciptionMetrics { diff --git a/common/app/concurrent/FutureSemaphore.scala b/common/app/concurrent/FutureSemaphore.scala deleted file mode 100644 index d2bea1c90979..000000000000 --- a/common/app/concurrent/FutureSemaphore.scala +++ /dev/null @@ -1,35 +0,0 @@ -package concurrent - -import java.util.concurrent.Semaphore - -import scala.concurrent.{ExecutionContext, Future} - -class FutureSemaphore(maxOperations: Int) { - private val semaphore = new Semaphore(maxOperations) - - def execute[A](task: => Future[A])(implicit ec: ExecutionContext): Future[A] = { - if (semaphore.tryAcquire()) { - val resultF = task - resultF.onComplete(_ => semaphore.release()) - resultF - } else { - Future.failed(new FutureSemaphore.TooManyOperationsInProgress()) - } - } - - def execute[A](task: => A)(implicit ec: ExecutionContext): A = { - if (semaphore.tryAcquire()) { - try { - task - } finally { - semaphore.release() - } - } else { - throw new FutureSemaphore.TooManyOperationsInProgress() - } - } -} - -object FutureSemaphore { - class TooManyOperationsInProgress extends Exception("Too many operations in progress, cannot execute task") -} diff --git a/common/app/metrics/FrontendMetrics.scala b/common/app/metrics/FrontendMetrics.scala index e5573406bffc..880170e3face 100644 --- a/common/app/metrics/FrontendMetrics.scala +++ b/common/app/metrics/FrontendMetrics.scala @@ -1,11 +1,12 @@ package metrics -import java.util.concurrent.atomic.AtomicLong import com.amazonaws.services.cloudwatch.model.StandardUnit import common.{Box, StopWatch} import model.diagnostics.CloudWatch import org.joda.time.DateTime +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -130,6 +131,8 @@ final case class DurationMetric(override val name: String, override val metricUn def record(dataPoint: DurationDataPoint): Unit = dataPoints.alter(dataPoint :: _) def recordDuration(timeInMillis: Double): Unit = record(DurationDataPoint(timeInMillis, Option(DateTime.now))) + def recordDuration(duration: java.time.Duration): Unit = + recordDuration(duration.toNanos.toDouble / MILLISECONDS.toNanos(1)) override def isEmpty: Boolean = dataPoints.get().isEmpty } @@ -141,15 +144,6 @@ object DurationMetric { metric.recordDuration(stopWatch.elapsed.toDouble) result } - - def withMetricsF[A](metric: DurationMetric)(block: => Future[A])(implicit ec: ExecutionContext): Future[A] = { - val stopWatch: StopWatch = new StopWatch - val result = block - result.onComplete { _ => - metric.recordDuration(stopWatch.elapsed.toDouble) - } - result - } } case class SampledDataPoint(value: Double, sampleTime: DateTime) extends DataPoint { diff --git a/common/app/services/ETagCache.scala b/common/app/services/ETagCache.scala deleted file mode 100644 index 217fb6098320..000000000000 --- a/common/app/services/ETagCache.scala +++ /dev/null @@ -1,45 +0,0 @@ -package services - -import cats.Endo -import com.github.blemale.scaffeine.Scaffeine - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.{ExecutionContext, Future} - -/** - * - * @param eTag https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag - */ -case class ETaggedData[T](eTag: String, result: T) { - def map[S](f: T => S): ETaggedData[S] = copy(result = f(result)) -} - -trait Fetching[ResourceId, Response] { - def fetch(resourceId: ResourceId)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] - - def fetchOnlyIfETagChanged(resourceId: ResourceId, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] -} - -class ETagCache[K, Response, T]( - fetching: Fetching[K, Response], - resultForResponse: Response => T, - freshnessPolicy: FreshnessPolicy, - cacheConfig: Endo[Scaffeine[Any, Any]] -) { - - private val cache = - cacheConfig(Scaffeine()).buildAsyncFuture[K, ETaggedData[T]]( - loader = key => fetching.fetch(key).map(_.map(resultForResponse)), - reloadLoader = Some { - case (key, cachedResource) => fetching.fetchOnlyIfETagChanged(key, cachedResource.eTag).map { - case None => cachedResource // we got HTTP 304 'NOT MODIFIED': no new data - old data still valid - case Some(freshResponse) => freshResponse.map(resultForResponse) - } - } - ) - - private val read = freshnessPolicy.on(cache) - - def get(resourceId: K): Future[T] = read(resourceId).map(_.result) - -} diff --git a/common/app/services/FreshnessPolicy.scala b/common/app/services/FreshnessPolicy.scala deleted file mode 100644 index 531e5a945cfe..000000000000 --- a/common/app/services/FreshnessPolicy.scala +++ /dev/null @@ -1,28 +0,0 @@ -package services - -import com.github.blemale.scaffeine.AsyncLoadingCache - -import scala.concurrent.Future - -sealed trait FreshnessPolicy { - def on[K, V](cache: AsyncLoadingCache[K, V]): K => Future[V] -} - -/** The values returned by the cache will **always** be up-to-date - - * the Future will not complete until any cached value has had - * its ETag checked, and a new value has been computed if required. - */ -case object AlwaysWaitForRefreshedValue extends FreshnessPolicy { - def on[K, V](cache: AsyncLoadingCache[K, V]): K => Future[V] = { - val syncCache = cache.synchronous() - - { k => syncCache.refresh(k) } - } -} - -/** The cache will return the old value for a key if it's still available, - * not waiting for any refresh to complete. - */ -case object TolerateOldValueWhileRefreshing extends FreshnessPolicy { - def on[K, V](cache: AsyncLoadingCache[K, V]): K => Future[V] = cache.get -} diff --git a/common/app/services/S3.scala b/common/app/services/S3.scala index 69ee3b8c4974..c2d618c89fd7 100644 --- a/common/app/services/S3.scala +++ b/common/app/services/S3.scala @@ -1,86 +1,20 @@ package services -import cats.Endo import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead} import com.amazonaws.services.s3.model._ import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.amazonaws.util.StringInputStream -import com.google.common.base.Throwables +import com.gu.etagcaching.aws.s3.ObjectId import common.GuLogging import conf.Configuration import model.PressedPageType import org.joda.time.DateTime -import play.api.http.Status.NOT_MODIFIED -import software.amazon.awssdk.core.async.AsyncResponseTransformer -import software.amazon.awssdk.core.{ResponseBytes, ResponseInputStream} -import software.amazon.awssdk.services.s3.S3AsyncClient -import software.amazon.awssdk.services.s3.model.{GetObjectResponse, S3Exception} -import software.amazon.awssdk.{services => awssdkV2} -import utils.AWSv2 +import services.S3.logS3ExceptionWithDevHint import java.io._ import java.util.zip.GZIPOutputStream -import scala.compat.java8.FutureConverters.CompletionStageOps -import scala.concurrent.{ExecutionContext, Future} import scala.io.{Codec, Source} -/** AWS SDK v2 supports several different ways of handling an S3 response (getting a byte array, or - * an InputStream, or downloading to a file) using `AsyncResponseTransformer`s. Unfortunately, the - * result types do not share a common interface for extracting the response metadata - * (`GetObjectResponse`), even though they all have an identically-named method called `response()` - * for getting what we want. - * - * This Scala wrapper around the `AsyncResponseTransformer` aims to encapsulate all the things the - * `AsyncResponseTransformer`s have in common. - * - * We could use Structural Types (https://docs.scala-lang.org/scala3/reference/changed-features/structural-types.html) - * to make the differing objects seem to conform to a common interface, but this does involve - * reflection (https://stackoverflow.com/a/26788585/438886), which is a bit inefficient/scary. - */ -case class Transformer[ResultType]( - asyncResponseTransformer: () => AsyncResponseTransformer[awssdkV2.s3.model.GetObjectResponse, ResultType], - rawResponseObjectOf: ResultType => awssdkV2.s3.model.GetObjectResponse, -) - -case class S3ObjectId(bucket: String, key: String) { - val s3Uri: String = s"s3://$bucket/$key" -} - -class S3ObjectIdFetching[Response](s3Client: S3AsyncClient, transformer: Transformer[Response]) - extends Fetching[services.S3ObjectId, Response] { - def wrapWithETag(resp: Response): ETaggedData[Response] = - ETaggedData(transformer.rawResponseObjectOf(resp).eTag(), resp) - - private def performFetch( - resourceId: services.S3ObjectId, - reqModifier: Endo[awssdkV2.s3.model.GetObjectRequest.Builder] = identity, - )(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = { - val requestBuilder = awssdkV2.s3.model.GetObjectRequest.builder().bucket(resourceId.bucket).key(resourceId.key) - val request = reqModifier(requestBuilder).build() - s3Client.getObject(request, transformer.asyncResponseTransformer()) - .toScala - .transform( - wrapWithETag, - Throwables.getRootCause, - ) // see https://github.com/guardian/ophan/commit/49fa22176 - } - - def fetch(resourceId: S3ObjectId)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = performFetch(resourceId) - - def fetchOnlyIfETagChanged(resourceId: S3ObjectId, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] = - performFetch(resourceId, _.ifNoneMatch(eTag)).map(Some(_)).recover { - case e: S3Exception if e.statusCode == NOT_MODIFIED => None // no fresh download because the ETag matched! - } -} - -object Transformer { - val Bytes: Transformer[ResponseBytes[GetObjectResponse]] = - Transformer(() => AsyncResponseTransformer.toBytes[GetObjectResponse], _.response) - - val BlockingInputStream: Transformer[ResponseInputStream[GetObjectResponse]] = - Transformer(() => AsyncResponseTransformer.toBlockingInputStream[GetObjectResponse], _.response) -} - trait S3 extends GuLogging { lazy val bucket = Configuration.aws.frontendStoreBucket @@ -94,8 +28,8 @@ trait S3 extends GuLogging { private def withS3Result[T](key: String)(action: S3Object => T): Option[T] = client.flatMap { client => + val objectId = ObjectId(bucket, key) try { - val request = new GetObjectRequest(bucket, key) val result = client.getObject(request) log.info(s"S3 got ${result.getObjectMetadata.getContentLength} bytes from ${result.getKey}") @@ -111,23 +45,16 @@ trait S3 extends GuLogging { } } catch { case e: AmazonS3Exception if e.getStatusCode == 404 => - log.warn("not found at %s - %s" format (bucket, key)) + log.warn(s"not found at ${objectId.s3Uri}") None case e: AmazonS3Exception => - logS3Exception(key, e) + logS3ExceptionWithDevHint(objectId, e) None case e: Exception => throw e } } - private def logS3Exception[T](key: String, e: Throwable): Unit = { - val errorMsg = s"Unable to fetch S3 object (key: $bucket $key)" - val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." - log.error(errorMsg, e) - println(errorMsg + " \n" + hintMsg) - } - def get(key: String)(implicit codec: Codec): Option[String] = withS3Result(key) { result => Source.fromInputStream(result.getObjectContent).mkString @@ -214,7 +141,14 @@ trait S3 extends GuLogging { } } -object S3 extends S3 +object S3 extends S3 { + def logS3ExceptionWithDevHint(s3ObjectId: ObjectId, e: Exception): Unit = { + val errorMsg = s"Unable to fetch S3 object (${s3ObjectId.s3Uri})" + val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." + log.error(errorMsg, e) + println(errorMsg + " \n" + hintMsg) + } +} object S3FrontsApi extends S3 { diff --git a/common/app/services/fronts/FrontJsonFapi.scala b/common/app/services/fronts/FrontJsonFapi.scala index 5bfe9a130d21..d75d06a247cd 100644 --- a/common/app/services/fronts/FrontJsonFapi.scala +++ b/common/app/services/fronts/FrontJsonFapi.scala @@ -1,76 +1,65 @@ package services.fronts -import com.github.blemale.scaffeine.Scaffeine -import common.{FaciaPressMetrics, GuLogging} -import concurrent.{BlockingOperations, FutureSemaphore} +import com.gu.etagcaching.ETagCache +import com.gu.etagcaching.FreshnessPolicy.AlwaysWaitForRefreshedValue +import com.gu.etagcaching.aws.s3.ObjectId +import com.gu.etagcaching.aws.sdkv2.s3.S3ObjectFetching +import com.gu.etagcaching.aws.sdkv2.s3.response.Transformer.Bytes +import common.FaciaPressMetrics.{FrontDecodingLatency, FrontDownloadLatency, FrontNotModifiedDownloadLatency} +import common.GuLogging import conf.Configuration -import metrics.DurationMetric +import metrics.DurationMetric.withMetrics import model.{PressedPage, PressedPageType} import play.api.libs.json.Json +import services.S3.logS3ExceptionWithDevHint import services._ -import software.amazon.awssdk.core.ResponseInputStream -import software.amazon.awssdk.services.s3.model.GetObjectResponse -import utils.AWSv2 +import software.amazon.awssdk.services.s3.model.S3Exception +import utils.AWSv2.S3Async import java.util.zip.GZIPInputStream import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Success, Using} +import scala.util.{Failure, Success, Using} trait FrontJsonFapi extends GuLogging { lazy val stage: String = Configuration.facia.stage.toUpperCase val bucketLocation: String - val parallelJsonPresses = 32 - val futureSemaphore = new FutureSemaphore(parallelJsonPresses) - def blockingOperations: BlockingOperations - - private def getAddressForPath(path: String, prefix: String): String = - s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json" + private def s3ObjectIdFor(path: String, prefix: String): ObjectId = + ObjectId( + S3.bucket, + s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json", + ) - def get(path: String, pageType: PressedPageType)(implicit - executionContext: ExecutionContext, - ): Future[Option[PressedPage]] = - errorLoggingF(s"FrontJsonFapi.get $path") { - val boom = getAddressForPath(path, pageType.suffix) - Future - .traverse((1 to 1).toList) { _ => - pressedPageFromS3(boom).recover { - case e => - log.info("Error while stress-testing pressedPageFromS3", e) - None - } - } - .map { results => - log.info(s"${results.flatten.size}/${results.size} page fetches successful") - results.head + private val pressedPageCache: ETagCache[ObjectId, PressedPage] = new ETagCache( + S3ObjectFetching(S3Async, Bytes) + .timing( + successWith = FrontDownloadLatency.recordDuration, + notModifiedWith = FrontNotModifiedDownloadLatency.recordDuration, + ) + .thenParsing { bytes => + withMetrics(FrontDecodingLatency) { + Using(new GZIPInputStream(bytes.asInputStream()))(Json.parse(_).as[PressedPage]).get } - } - - private val pressedPageCache: ETagCache[S3ObjectId, ResponseInputStream[GetObjectResponse], PressedPage] = - new ETagCache( - new S3ObjectIdFetching(AWSv2.S3Async, Transformer.BlockingInputStream), - inputStream => - DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) { - Using(new GZIPInputStream(inputStream))(Json.parse(_).as[PressedPage]).get - }, - AlwaysWaitForRefreshedValue, - _.maximumSize(500), - ) + }, + AlwaysWaitForRefreshedValue, + _.maximumSize(500), + ) - private def pressedPageFromS3( - path: String, - )(implicit ec: ExecutionContext): Future[Option[PressedPage]] = - errorLoggingF(s"FrontJsonFapi.pressedPageFromS3 $path") { - pressedPageCache.get(S3ObjectId(S3.bucket, path)).transform(t => Success(t.toOption)) - // DurationMetric.withMetricsF(FaciaPressMetrics.FrontDownloadLatency) { - // DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) { + def get(path: String, pageType: PressedPageType)(implicit ec: ExecutionContext): Future[Option[PressedPage]] = + errorLoggingF(s"FrontJsonFapi.get $path") { + val objectId = s3ObjectIdFor(path, pageType.suffix) + pressedPageCache.get(objectId).map(Some(_)).recover { + case s3Exception: S3Exception => + logS3ExceptionWithDevHint(objectId, s3Exception) + None + } } } -class FrontJsonFapiLive(val blockingOperations: BlockingOperations) extends FrontJsonFapi { +class FrontJsonFapiLive() extends FrontJsonFapi { override val bucketLocation: String = s"$stage/frontsapi/pressed/live" } -class FrontJsonFapiDraft(val blockingOperations: BlockingOperations) extends FrontJsonFapi { +class FrontJsonFapiDraft() extends FrontJsonFapi { override val bucketLocation: String = s"$stage/frontsapi/pressed/draft" } diff --git a/common/test/package.scala b/common/test/package.scala index 7ed6ee4fa8f7..466388be183d 100644 --- a/common/test/package.scala +++ b/common/test/package.scala @@ -170,8 +170,7 @@ trait WithTestCSRF { trait WithTestFrontJsonFapi { // need a front api that stores S3 locally so it can run without deps in the unit tests - class TestFrontJsonFapi(override val blockingOperations: BlockingOperations) - extends FrontJsonFapiLive(blockingOperations) { + class TestFrontJsonFapi extends FrontJsonFapiLive { override def get(path: String, pageType: PressedPageType)(implicit executionContext: ExecutionContext, @@ -197,7 +196,5 @@ trait WithTestFrontJsonFapi { } } - lazy val actorSystem = ActorSystem() - lazy val blockingOperations = new BlockingOperations(actorSystem) - lazy val fapi = new TestFrontJsonFapi(blockingOperations) + lazy val fapi = new TestFrontJsonFapi() } diff --git a/facia/app/AppLoader.scala b/facia/app/AppLoader.scala index a15f21d52999..2a761667807e 100644 --- a/facia/app/AppLoader.scala +++ b/facia/app/AppLoader.scala @@ -68,6 +68,7 @@ trait AppComponents extends FrontendComponents with FaciaControllers with FapiSe override lazy val appMetrics = ApplicationMetrics( FaciaPressMetrics.FrontDecodingLatency, FaciaPressMetrics.FrontDownloadLatency, + FaciaPressMetrics.FrontNotModifiedDownloadLatency, DCRMetrics.DCRLatencyMetric, DCRMetrics.DCRRequestCountMetric, ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4a6ed9d1fe98..ab1e4f444ba0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,6 +22,7 @@ object Dependencies { val awsEc2 = "com.amazonaws" % "aws-java-sdk-ec2" % awsVersion val awsKinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsVersion + val eTagCachingS3 = "com.gu.etag-caching" %% "aws-s3-sdk-v2" % "1.0.1" val awsSes = "com.amazonaws" % "aws-java-sdk-ses" % awsVersion val awsSns = "com.amazonaws" % "aws-java-sdk-sns" % awsVersion val awsSts = "com.amazonaws" % "aws-java-sdk-sts" % awsVersion