Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyley committed Aug 2, 2023
1 parent cee8eb2 commit 7bcd556
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 256 deletions.
4 changes: 1 addition & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ val common = library("common")
awsSts,
awsSqs,
awsSsm,
eTagCachingS3,
contentApiClient,
enumeratumPlayJson,
filters,
Expand Down Expand Up @@ -69,10 +70,7 @@ val common = library("common")
identityModel,
capiAws,
okhttp,
"software.amazon.awssdk" % "s3" % "2.20.96",
) ++ jackson,
)
.settings(
TestAssets / mappings ~= filterAssets,
)

Expand Down
1 change: 1 addition & 0 deletions common/app/common/metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object FaciaPressMetrics {
val FrontPressContentSizeLite = SamplerMetric("front-press-content-size-lite", StandardUnit.Bytes)
val FrontDecodingLatency = DurationMetric("front-decoding-latency", StandardUnit.Milliseconds)
val FrontDownloadLatency = DurationMetric("front-download-latency", StandardUnit.Milliseconds)
val FrontNotModifiedDownloadLatency = DurationMetric("front-not-modified-download-latency", StandardUnit.Milliseconds)
}

object EmailSubsciptionMetrics {
Expand Down
35 changes: 0 additions & 35 deletions common/app/concurrent/FutureSemaphore.scala

This file was deleted.

14 changes: 4 additions & 10 deletions common/app/metrics/FrontendMetrics.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package metrics

import java.util.concurrent.atomic.AtomicLong
import com.amazonaws.services.cloudwatch.model.StandardUnit
import common.{Box, StopWatch}
import model.diagnostics.CloudWatch
import org.joda.time.DateTime

import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

Expand Down Expand Up @@ -130,6 +131,8 @@ final case class DurationMetric(override val name: String, override val metricUn
def record(dataPoint: DurationDataPoint): Unit = dataPoints.alter(dataPoint :: _)

def recordDuration(timeInMillis: Double): Unit = record(DurationDataPoint(timeInMillis, Option(DateTime.now)))
def recordDuration(duration: java.time.Duration): Unit =
recordDuration(duration.toNanos.toDouble / MILLISECONDS.toNanos(1))

override def isEmpty: Boolean = dataPoints.get().isEmpty
}
Expand All @@ -141,15 +144,6 @@ object DurationMetric {
metric.recordDuration(stopWatch.elapsed.toDouble)
result
}

def withMetricsF[A](metric: DurationMetric)(block: => Future[A])(implicit ec: ExecutionContext): Future[A] = {
val stopWatch: StopWatch = new StopWatch
val result = block
result.onComplete { _ =>
metric.recordDuration(stopWatch.elapsed.toDouble)
}
result
}
}

case class SampledDataPoint(value: Double, sampleTime: DateTime) extends DataPoint {
Expand Down
45 changes: 0 additions & 45 deletions common/app/services/ETagCache.scala

This file was deleted.

28 changes: 0 additions & 28 deletions common/app/services/FreshnessPolicy.scala

This file was deleted.

92 changes: 13 additions & 79 deletions common/app/services/S3.scala
Original file line number Diff line number Diff line change
@@ -1,86 +1,20 @@
package services

import cats.Endo
import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead}
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client}
import com.amazonaws.util.StringInputStream
import com.google.common.base.Throwables
import com.gu.etagcaching.aws.s3.ObjectId
import common.GuLogging
import conf.Configuration
import model.PressedPageType
import org.joda.time.DateTime
import play.api.http.Status.NOT_MODIFIED
import software.amazon.awssdk.core.async.AsyncResponseTransformer
import software.amazon.awssdk.core.{ResponseBytes, ResponseInputStream}
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{GetObjectResponse, S3Exception}
import software.amazon.awssdk.{services => awssdkV2}
import utils.AWSv2
import services.S3.logS3ExceptionWithDevHint

import java.io._
import java.util.zip.GZIPOutputStream
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
import scala.io.{Codec, Source}

/** AWS SDK v2 supports several different ways of handling an S3 response (getting a byte array, or
* an InputStream, or downloading to a file) using `AsyncResponseTransformer`s. Unfortunately, the
* result types do not share a common interface for extracting the response metadata
* (`GetObjectResponse`), even though they all have an identically-named method called `response()`
* for getting what we want.
*
* This Scala wrapper around the `AsyncResponseTransformer` aims to encapsulate all the things the
* `AsyncResponseTransformer`s have in common.
*
* We could use Structural Types (https://docs.scala-lang.org/scala3/reference/changed-features/structural-types.html)
* to make the differing objects seem to conform to a common interface, but this does involve
* reflection (https://stackoverflow.com/a/26788585/438886), which is a bit inefficient/scary.
*/
case class Transformer[ResultType](
asyncResponseTransformer: () => AsyncResponseTransformer[awssdkV2.s3.model.GetObjectResponse, ResultType],
rawResponseObjectOf: ResultType => awssdkV2.s3.model.GetObjectResponse,
)

case class S3ObjectId(bucket: String, key: String) {
val s3Uri: String = s"s3://$bucket/$key"
}

class S3ObjectIdFetching[Response](s3Client: S3AsyncClient, transformer: Transformer[Response])
extends Fetching[services.S3ObjectId, Response] {
def wrapWithETag(resp: Response): ETaggedData[Response] =
ETaggedData(transformer.rawResponseObjectOf(resp).eTag(), resp)

private def performFetch(
resourceId: services.S3ObjectId,
reqModifier: Endo[awssdkV2.s3.model.GetObjectRequest.Builder] = identity,
)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = {
val requestBuilder = awssdkV2.s3.model.GetObjectRequest.builder().bucket(resourceId.bucket).key(resourceId.key)
val request = reqModifier(requestBuilder).build()
s3Client.getObject(request, transformer.asyncResponseTransformer())
.toScala
.transform(
wrapWithETag,
Throwables.getRootCause,
) // see https://github.com/guardian/ophan/commit/49fa22176
}

def fetch(resourceId: S3ObjectId)(implicit ec: ExecutionContext): Future[ETaggedData[Response]] = performFetch(resourceId)

def fetchOnlyIfETagChanged(resourceId: S3ObjectId, eTag: String)(implicit ec: ExecutionContext): Future[Option[ETaggedData[Response]]] =
performFetch(resourceId, _.ifNoneMatch(eTag)).map(Some(_)).recover {
case e: S3Exception if e.statusCode == NOT_MODIFIED => None // no fresh download because the ETag matched!
}
}

object Transformer {
val Bytes: Transformer[ResponseBytes[GetObjectResponse]] =
Transformer(() => AsyncResponseTransformer.toBytes[GetObjectResponse], _.response)

val BlockingInputStream: Transformer[ResponseInputStream[GetObjectResponse]] =
Transformer(() => AsyncResponseTransformer.toBlockingInputStream[GetObjectResponse], _.response)
}

trait S3 extends GuLogging {

lazy val bucket = Configuration.aws.frontendStoreBucket
Expand All @@ -94,8 +28,8 @@ trait S3 extends GuLogging {

private def withS3Result[T](key: String)(action: S3Object => T): Option[T] =
client.flatMap { client =>
val objectId = ObjectId(bucket, key)
try {

val request = new GetObjectRequest(bucket, key)
val result = client.getObject(request)
log.info(s"S3 got ${result.getObjectMetadata.getContentLength} bytes from ${result.getKey}")
Expand All @@ -111,23 +45,16 @@ trait S3 extends GuLogging {
}
} catch {
case e: AmazonS3Exception if e.getStatusCode == 404 =>
log.warn("not found at %s - %s" format (bucket, key))
log.warn(s"not found at ${objectId.s3Uri}")
None
case e: AmazonS3Exception =>
logS3Exception(key, e)
logS3ExceptionWithDevHint(objectId, e)
None
case e: Exception =>
throw e
}
}

private def logS3Exception[T](key: String, e: Throwable): Unit = {
val errorMsg = s"Unable to fetch S3 object (key: $bucket $key)"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
}

def get(key: String)(implicit codec: Codec): Option[String] =
withS3Result(key) { result =>
Source.fromInputStream(result.getObjectContent).mkString
Expand Down Expand Up @@ -214,7 +141,14 @@ trait S3 extends GuLogging {
}
}

object S3 extends S3
object S3 extends S3 {
def logS3ExceptionWithDevHint(s3ObjectId: ObjectId, e: Exception): Unit = {
val errorMsg = s"Unable to fetch S3 object (${s3ObjectId.s3Uri})"
val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus."
log.error(errorMsg, e)
println(errorMsg + " \n" + hintMsg)
}
}

object S3FrontsApi extends S3 {

Expand Down
Loading

0 comments on commit 7bcd556

Please sign in to comment.