diff --git a/d4s/src/main/scala/d4s/models/DynamoExecution.scala b/d4s/src/main/scala/d4s/models/DynamoExecution.scala index e65ba44..e0d4f3f 100644 --- a/d4s/src/main/scala/d4s/models/DynamoExecution.scala +++ b/d4s/src/main/scala/d4s/models/DynamoExecution.scala @@ -32,21 +32,23 @@ final case class DynamoExecution[DR <: DynamoRequest, Dec, +A]( def void: DynamoExecution[DR, Dec, Unit] = { map(_ => ()) } - def redeem[B](err: Throwable => StrategyInput[UnknownF, DR, Dec] => UnknownF[Throwable, B], - succ: A => StrategyInput[UnknownF, DR, Dec] => UnknownF[Throwable, B]): DynamoExecution[DR, Dec, B] = { + def redeem[B]( + err: Throwable => StrategyInput[UnknownF, DR, Dec] => UnknownF[Throwable, B], + succ: A => StrategyInput[UnknownF, DR, Dec] => UnknownF[Throwable, B], + ): DynamoExecution[DR, Dec, B] = { modifyExecution(io => ctx => ctx.F.redeem(io)(err(_)(ctx), succ(_)(ctx))) } def catchAll[A1 >: A](err: Throwable => StrategyInput[UnknownF, DR, Dec] => UnknownF[Throwable, A1]): DynamoExecution[DR, Dec, A1] = { redeem(err, a => _.F.pure(a)) } - def retryWithPrefix(ddl: TableDDL, sleep: Duration = 1.second)(implicit ev: DR <:< WithTableReference[DR]): DynamoExecution[DR, Dec, A] = { - modifyStrategy(DynamoExecution.retryWithPrefix(ddl, sleep)) - } def optConditionFailure: DynamoExecution[DR, Dec, Option[ConditionalCheckFailedException]] = { - redeem({ - case DynamoException(_, err: ConditionalCheckFailedException) => _.F.pure(Some(err)) - case err: Throwable => _.F.fail(err) - }, _ => _.F.pure(None)).discardInterpreterError[ConditionalCheckFailedException] + redeem( + { + case DynamoException(_, err: ConditionalCheckFailedException) => _.F.pure(Some(err)) + case err: Throwable => _.F.fail(err) + }, + _ => _.F.pure(None), + ).discardInterpreterError[ConditionalCheckFailedException] } def boolConditionSuccess: DynamoExecution[DR, Dec, Boolean] = { optConditionFailure.map(_.isEmpty) @@ -86,6 +88,14 @@ final case class DynamoExecution[DR <: DynamoRequest, Dec, +A]( object DynamoExecution { + implicit final class RetryWithPrefix[DR <: DynamoRequest with WithTableReference[DR], Dec, +A]( + private val exec: DynamoExecution[DR, Dec, A] + ) extends AnyVal { + def retryWithPrefix(ddl: TableDDL, sleep: Duration = 1.second): DynamoExecution[DR, Dec, A] = { + exec.modifyStrategy(DynamoExecution.retryWithPrefix(ddl, sleep)) + } + } + def apply[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec]): DynamoExecution[DR, Dec, Dec] = { new DynamoExecution[DR, Dec, Dec](dynamoQuery, DynamoExecution.single[DR, Dec]) } @@ -100,6 +110,7 @@ object DynamoExecution { } } + // @formatter:off def createTable[F[+_, +_]](table: TableReference, ddl: TableDDL, sleep: Duration = 1.second): DynamoExecution[CreateTable, CreateTableResponse, Unit] = { CreateTable(table, ddl).toQuery.exec .discardInterpreterError[ResourceInUseException].redeem( @@ -109,6 +120,7 @@ object DynamoExecution { }, { rsp => in => import in._ + val resourceNotFoundHandler: PartialFunction[Throwable, UnknownF[Nothing, Unit]] = { case _: ResourceNotFoundException => F.unit } val updateTTL = table.ttlField match { case None => F.unit @@ -129,6 +141,7 @@ object DynamoExecution { } ) } + // @formatter:on def listTables: DynamoExecution[ListTables, List[String], List[String]] = { ListTables().toQuery.decode(_.tableNames().asScala.toList).execPagedFlatten() @@ -138,12 +151,12 @@ object DynamoExecution { ListTables().toQuery.decode(_.tableNames().asScala.toList).execStreamedFlatten } - def offset[DR <: DynamoRequest, Dec, A](offsetLimit: OffsetLimit)( - implicit + def offset[DR <: DynamoRequest with WithSelect[DR] with WithLimit[DR] with WithProjectionExpression[DR], Dec, A]( + offsetLimit: OffsetLimit + )(implicit paging: PageableRequest[DR], - ev1: DR <:< WithSelect[DR] with WithLimit[DR] with WithProjectionExpression[DR], ev2: DR#Rsp => { def count(): Integer }, - ev3: Dec <:< List[A] + ev3: Dec <:< List[A], ): ExecutionStrategy[DR, Dec, List[A]] = ExecutionStrategy { in => import in._ @@ -194,8 +207,8 @@ object DynamoExecution { private[this] def pagedImpl[DR <: DynamoRequest, Dec, A]( limit: Option[Int] - )(f: List[Dec] => List[A])( - implicit paging: PageableRequest[DR] + )(f: List[Dec] => List[A] + )(implicit paging: PageableRequest[DR] ): ExecutionStrategy[DR, Dec, List[A]] = ExecutionStrategy { in => @@ -227,9 +240,10 @@ object DynamoExecution { } yield res } - def retryWithPrefix[DR <: DynamoRequest, Dec, A](ddl: TableDDL, sleep: Duration = 1.second)(nested: ExecutionStrategy[DR, Dec, A])( - implicit - ev: DR <:< WithTableReference[DR] + def retryWithPrefix[DR <: DynamoRequest with WithTableReference[DR], Dec, A]( + ddl: TableDDL, + sleep: Duration = 1.second, + )(nested: ExecutionStrategy[DR, Dec, A] ): ExecutionStrategy[DR, Dec, A] = ExecutionStrategy[DR, Dec, A] { in => import in._ @@ -242,8 +256,13 @@ object DynamoExecution { } } - private[this] def retryIfTableNotFound[F[_], A](attempts: Int, - sleep: F[Unit])(prepareTable: F[Unit])(attemptAction: F[A])(implicit F: MonadError[F, Throwable]): F[A] = { + private[this] def retryIfTableNotFound[F[_], A]( + attempts: Int, + sleep: F[Unit], + )(prepareTable: F[Unit] + )(attemptAction: F[A] + )(implicit F: MonadError[F, Throwable] + ): F[A] = { import cats.syntax.applicativeError._ import cats.syntax.flatMap._ @@ -263,7 +282,7 @@ object DynamoExecution { final case class Streamed[DR <: DynamoRequest, Dec, +A]( dynamoQuery: DynamoQuery[DR, Dec], - executionStrategy: ExecutionStrategy.Streamed[DR, Dec, A] + executionStrategy: ExecutionStrategy.Streamed[DR, Dec, A], ) extends DynamoExecution.Dependent[DR, Dec, StreamFThrowable[?[_, `+_`], A]] { def map[B](f: A => B): DynamoExecution.Streamed[DR, Dec, B] = { @@ -285,10 +304,6 @@ object DynamoExecution { modifyExecution(stream => _ => f(stream)) } - def retryWithPrefix(ddl: TableDDL, sleep: Duration = 1.second)(implicit ev: DR <:< WithTableReference[DR]): DynamoExecution.Streamed[DR, Dec, A] = { - modifyStrategy(Streamed.retryWithPrefix(ddl, sleep)) - } - def modify(f: DR => DR): DynamoExecution.Streamed[DR, Dec, A] = { copy(dynamoQuery = dynamoQuery.modify(f)) } @@ -310,6 +325,14 @@ object DynamoExecution { object Streamed { + implicit final class RetryWithPrefix[DR <: DynamoRequest with WithTableReference[DR], Dec, +A]( + private val exec: DynamoExecution.Streamed[DR, Dec, A] + ) extends AnyVal { + def retryWithPrefix(ddl: TableDDL, sleep: Duration = 1.second): DynamoExecution.Streamed[DR, Dec, A] = { + exec.modifyStrategy(Streamed.retryWithPrefix(ddl, sleep)) + } + } + def streamed[DR <: DynamoRequest, Dec](implicit paging: PageableRequest[DR]): ExecutionStrategy.Streamed[DR, Dec, Dec] = ExecutionStrategy.Streamed[DR, Dec, Dec] { in => @@ -321,14 +344,14 @@ object DynamoExecution { streamExecutionWrapper { for { oldKey <- lastEvaluatedKey.get - newReq = query.modify(paging.withPageMarkerOption(_, oldKey)) + newReq = query.modify(paging.withPageMarkerOption(_, oldKey)) newRsp <- interpreter.run(newReq, in.interpreterErrorHandler) - newKey = paging.getPageMarker(newRsp) + newKey = paging.getPageMarker(newRsp) _ <- lastEvaluatedKey.set(newKey) continue = newKey.isDefined - decoded <- query.decoder[F](newRsp) + decoded <- query.decoder[F](newRsp) } yield continue -> decoded } }.takeThrough(_._1).map(_._2) @@ -341,8 +364,10 @@ object DynamoExecution { streamed[DR, Dec].apply(in).flatMap(Stream.emits(_)) } - def retryWithPrefix[DR <: DynamoRequest, Dec, A](ddl: TableDDL, sleep: Duration = 1.second)(nested: ExecutionStrategy.Streamed[DR, Dec, A])( - implicit ev: DR <:< WithTableReference[DR] + def retryWithPrefix[DR <: DynamoRequest with WithTableReference[DR], Dec, A]( + ddl: TableDDL, + sleep: Duration = 1.second, + )(nested: ExecutionStrategy.Streamed[DR, Dec, A] ): ExecutionStrategy.Streamed[DR, Dec, A] = ExecutionStrategy.Streamed[DR, Dec, A] { in => import in._ diff --git a/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala b/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala index 85124ff..8b1f559 100644 --- a/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala +++ b/d4s/src/main/scala/d4s/models/query/DynamoQuery.scala @@ -11,7 +11,7 @@ import d4s.models.conditions.Condition.{attribute_exists, attribute_not_exists} import d4s.models.query.DynamoRequest._ import d4s.models.query.requests.UpdateTable import d4s.models.table.index.{GlobalIndexUpdate, ProvisionedGlobalIndex, TableIndex} -import d4s.models.table.{DynamoField, TableReference} +import d4s.models.table.{DynamoField, TableDDL, TableReference} import d4s.models.{DynamoExecution, FnBIO, OffsetLimit} import izumi.functional.bio.{BIO, BIOError, F} import izumi.fundamentals.platform.language.unused @@ -44,138 +44,168 @@ final case class DynamoQuery[DR <: DynamoRequest, +Dec]( object DynamoQuery { def apply[DR <: DynamoRequest](request: DR): DynamoQuery[DR, DR#Rsp] = DynamoQuery[DR, DR#Rsp](request, FnBIO.lift(identity[DR#Rsp])) - @inline implicit final def toDynamoExecution[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec]): DynamoExecution[DR, Dec, Dec] = { + @inline implicit final def toDynamoExecution[DR <: DynamoRequest, Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ): DynamoExecution[DR, Dec, Dec] = { DynamoExecution(dynamoQuery, DynamoExecution.single[DR, Dec]) } - implicit final class Exec[DR <: DynamoRequest, Dec](private val dynamoQuery: DynamoQuery[DR, Dec]) extends AnyVal { + implicit final class Exec[DR <: DynamoRequest, Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { def exec: DynamoExecution[DR, Dec, Dec] = toDynamoExecution(dynamoQuery) } - implicit final class ExecPagedFlatten[DR <: DynamoRequest: PageableRequest, Dec: ? <:< List[A], A](dynamoQuery: DynamoQuery[DR, Dec]) { - def execPagedFlatten(limit: Option[Int] = None): DynamoExecution[DR, Dec, List[A]] = { - DynamoExecution(dynamoQuery, DynamoExecution.pagedFlatten[DR, Dec, A](limit)) + implicit final class ExecPagedFlatten[DR <: DynamoRequest, A]( + private val dynamoQuery: DynamoQuery[DR, List[A]] + ) extends AnyVal { + def execPagedFlatten(limit: Option[Int] = None)(implicit paging: PageableRequest[DR]): DynamoExecution[DR, List[A], List[A]] = { + DynamoExecution(dynamoQuery, DynamoExecution.pagedFlatten[DR, List[A], A](limit)) } } - implicit final class ExecPaged[DR <: DynamoRequest: PageableRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec]) { - def execPaged(limit: Option[Int] = None): DynamoExecution[DR, Dec, List[Dec]] = { + implicit final class ExecPaged[DR <: DynamoRequest, Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { + def execPaged(limit: Option[Int] = None)(implicit paging: PageableRequest[DR]): DynamoExecution[DR, Dec, List[Dec]] = { DynamoExecution(dynamoQuery, DynamoExecution.paged[DR, Dec](limit)) } - def execStreamed: DynamoExecution.Streamed[DR, Dec, Dec] = { + def execStreamed(implicit paging: PageableRequest[DR]): DynamoExecution.Streamed[DR, Dec, Dec] = { DynamoExecution.Streamed[DR, Dec, Dec](dynamoQuery, DynamoExecution.Streamed.streamed[DR, Dec]) } } - implicit final class ExecStreamFlatten[DR <: DynamoRequest: PageableRequest, Dec: ? <:< List[A], A](dynamoQuery: DynamoQuery[DR, Dec]) { - def execStreamedFlatten: DynamoExecution.Streamed[DR, Dec, A] = { - DynamoExecution.Streamed[DR, Dec, A](dynamoQuery, DynamoExecution.Streamed.streamedFlatten[DR, Dec, A]) + implicit final class ExecStreamFlatten[DR <: DynamoRequest, A]( + private val dynamoQuery: DynamoQuery[DR, List[A]] + ) extends AnyVal { + def execStreamedFlatten(implicit paging: PageableRequest[DR]): DynamoExecution.Streamed[DR, List[A], A] = { + DynamoExecution.Streamed[DR, List[A], A](dynamoQuery, DynamoExecution.Streamed.streamedFlatten[DR, List[A], A]) } } - implicit final class ExecOffset[DR <: DynamoRequest, Dec, A]( - dynamoQuery: DynamoQuery[DR, Dec] - )(implicit - paging: PageableRequest[DR], - ev1: DR <:< WithSelect[DR] with WithLimit[DR] with WithProjectionExpression[DR], - ev2: DR#Rsp => { def count(): Integer }, - ev4: Dec <:< List[A], - ) { - def execOffset(offsetLimit: OffsetLimit): DynamoExecution[DR, Dec, List[A]] = { - new DynamoExecution[DR, Dec, List[A]](dynamoQuery, DynamoExecution.offset[DR, Dec, A](offsetLimit)) + implicit final class ExecOffset[DR <: DynamoRequest with WithSelect[DR] with WithLimit[DR] with WithProjectionExpression[DR], A]( + private val dynamoQuery: DynamoQuery[DR, List[A]] + ) extends AnyVal { + def execOffset(offsetLimit: OffsetLimit)(implicit paging: PageableRequest[DR], ev: DR#Rsp => { def count(): Integer }): DynamoExecution[DR, List[A], List[A]] = { + new DynamoExecution[DR, List[A], List[A]](dynamoQuery, DynamoExecution.offset[DR, List[A], A](offsetLimit)) + } + } + + implicit final class RetryWithPrefix[DR <: DynamoRequest with WithTableReference[DR], Dec]( + private val query: DynamoQuery[DR, Dec] + ) extends AnyVal { + import scala.concurrent.duration._ + def retryWithPrefix(ddl: TableDDL, sleep: Duration = 1.second): DynamoExecution[DR, Dec, Dec] = { + query.modifyStrategy(DynamoExecution.retryWithPrefix(ddl, sleep)) } } - implicit final class TweakFilterExpression[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithFilterExpression[DR]) - extends WithFilterExpression[DynamoQuery[DR, Dec]] { + implicit final class TweakFilterExpression[DR <: DynamoRequest with WithFilterExpression[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithFilterExpression[DynamoQuery[DR, Dec]] { @inline def withFilterExpression(c: Condition): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withFilterExpression(c)) } } - implicit final class TweakAttributeValues[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithAttributeValues[DR]) - extends WithAttributeValues[DynamoQuery[DR, Dec]] { + implicit final class TweakAttributeValues[DR <: DynamoRequest with WithAttributeValues[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithAttributeValues[DynamoQuery[DR, Dec]] { @inline def withAttributeValues(f: Map[String, AttributeValue] => Map[String, AttributeValue]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withAttributeValues(f)) } } - implicit final class TweakAttributeNames[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithAttributeNames[DR]) - extends WithAttributeNames[DynamoQuery[DR, Dec]] { + implicit final class TweakAttributeNames[DR <: DynamoRequest with WithAttributeNames[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithAttributeNames[DynamoQuery[DR, Dec]] { @inline def withAttributeNames(f: Map[String, String] => Map[String, String]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withAttributeNames(f)) } } - implicit final class TweakProjectionExpression[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithProjectionExpression[DR]) - extends WithProjectionExpression[DynamoQuery[DR, Dec]] { + implicit final class TweakProjectionExpression[DR <: DynamoRequest with WithProjectionExpression[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithProjectionExpression[DynamoQuery[DR, Dec]] { @inline def withProjectionExpression(f: Option[String] => Option[String]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withProjectionExpression(f)) } } - implicit final class TweakTableReference[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithTableReference[DR]) - extends WithTableReference[DynamoQuery[DR, Dec]] { + implicit final class TweakTableReference[DR <: DynamoRequest with WithTableReference[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithTableReference[DynamoQuery[DR, Dec]] { @inline def withTableReference(f: TableReference => TableReference): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withTableReference(f)) } @inline def table: TableReference = dynamoQuery.request.table } - implicit final class TweakIndex[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithIndex[DR]) - extends WithIndex[DynamoQuery[DR, Dec]] { + implicit final class TweakIndex[DR <: DynamoRequest with WithIndex[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithIndex[DynamoQuery[DR, Dec]] { @inline override def withIndex(index: TableIndex[_, _]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withIndex(index)) } } - implicit final class TweakLimit[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithLimit[DR]) - extends WithLimit[DynamoQuery[DR, Dec]] { + implicit final class TweakLimit[DR <: DynamoRequest with WithLimit[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithLimit[DynamoQuery[DR, Dec]] { @inline override def withLimit(limit: Int): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withLimit(limit)) } } - implicit final class TweakStartKey[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithStartKey[DR]) - extends WithStartKey[DynamoQuery[DR, Dec]] { + implicit final class TweakStartKey[DR <: DynamoRequest with WithStartKey[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithStartKey[DynamoQuery[DR, Dec]] { @inline override def withStartKeyMap(startKey: java.util.Map[String, AttributeValue]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withStartKeyMap(startKey)) } } - implicit final class TweakBatchItems[DR <: DynamoRequest, BatchType[_], Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithBatch[DR, BatchType]) - extends WithBatch[DynamoQuery[DR, Dec], BatchType] { + implicit final class TweakBatchItems[DR <: DynamoRequest with WithBatch[DR, BatchType], BatchType[_], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] with DynamoQuery[_ <: DynamoRequest with WithBatch[DR, BatchType], Dec] // satisfy intellij & scalac gods + ) extends WithBatch[DynamoQuery[DR, Dec], BatchType] { @inline override def withBatch[I: D4SEncoder](batchItems: List[BatchType[I]]): DynamoQuery[DR, Dec] = { - dynamoQuery.modify(_.withBatch(batchItems)) + (dynamoQuery: DynamoQuery[DR, Dec]).modify(_.withBatch(batchItems)) } @inline override def withBatch(batchItems: List[Map[String, AttributeValue]]): DynamoQuery[DR, Dec] = { - dynamoQuery.modify(_.withBatch(batchItems)) + (dynamoQuery: DynamoQuery[DR, Dec]).modify(_.withBatch(batchItems)) } @inline override def batchItems: List[Map[String, AttributeValue]] = dynamoQuery.request.batchItems } - implicit final class TweakScanIndexForward[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithScanIndexForward[DR]) - extends WithScanIndexForward[DynamoQuery[DR, Dec]] { + implicit final class TweakScanIndexForward[DR <: DynamoRequest with WithScanIndexForward[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithScanIndexForward[DynamoQuery[DR, Dec]] { @inline override def withScanIndexForward(sif: Boolean): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withScanIndexForward(sif)) } } - implicit final class TweakKey[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithKey[DR]) extends WithKey[DynamoQuery[DR, Dec]] { + implicit final class TweakKey[DR <: DynamoRequest with WithKey[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithKey[DynamoQuery[DR, Dec]] { @inline override def withKey(f: Map[String, AttributeValue] => Map[String, AttributeValue]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withKey(f)) } } - implicit final class TweakItem[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithItem[DR]) extends WithItem[DynamoQuery[DR, Dec]] { + implicit final class TweakItem[DR <: DynamoRequest with WithItem[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithItem[DynamoQuery[DR, Dec]] { @inline override def withItemAttributeValues(f: Map[String, AttributeValue] => Map[String, AttributeValue]): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withItemAttributeValues(f)) } } @SuppressWarnings(Array("UnsafeTraversableMethods")) - implicit final class TweakExists[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithCondition[DR] with WithTableReference[DR]) { + implicit final class TweakExists[DR <: DynamoRequest with WithCondition[DR] with WithTableReference[DR], Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { def ifExists(): DynamoQuery[DR, Dec] = { val table = dynamoQuery.table dynamoQuery.withCondition(table.key.keyNames.toList.map(s => attribute_exists(List(s)): Condition).reduceLeft(_ && _)) @@ -187,24 +217,25 @@ object DynamoQuery { } } - implicit final class TweakCondition[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithCondition[DR]) - extends WithCondition[DynamoQuery[DR, Dec]] { + implicit final class TweakCondition[DR <: DynamoRequest with WithCondition[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithCondition[DynamoQuery[DR, Dec]] { @inline def withCondition(c: Condition): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withCondition(c)) } } - implicit final class TweakUpdateExpression[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithUpdateExpression[DR]) - extends WithUpdateExpression[DynamoQuery[DR, Dec]] { + implicit final class TweakUpdateExpression[DR <: DynamoRequest with WithUpdateExpression[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithUpdateExpression[DynamoQuery[DR, Dec]] { @inline def withUpdateExpression(f: String => String): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withUpdateExpression(f)) } } - implicit final class TweakWithTtl[DR <: DynamoRequest, Dec]( - dynamoQuery: DynamoQuery[DR, Dec] - )(implicit ev: DR <:< WithAttributeValues[DR] with WithTableReference[DR] with WithFilterExpression[DR] - ) { + implicit final class TweakWithTtl[DR <: DynamoRequest with WithAttributeValues[DR] with WithTableReference[DR] with WithFilterExpression[DR], Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { def filterTtl(now: ZonedDateTime): DynamoQuery[DR, Dec] = { filterTtl(now.toEpochSecond) } @@ -220,22 +251,24 @@ object DynamoQuery { } } - implicit final class TweakWithTtlField[DR <: DynamoRequest, Dec](private val dynamoQuery: DynamoQuery[DR, Dec]) extends AnyVal { - def withTtlFieldOption(expiration: Option[ZonedDateTime])(implicit ev: DR <:< WithItem[DR] with WithTableReference[DR]): DynamoQuery[DR, Dec] = - expiration.fold(dynamoQuery)(withTtlField(_)) + implicit final class TweakWithTtlField[DR <: DynamoRequest with WithItem[DR] with WithTableReference[DR], Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { + def withTtlFieldOption(expiration: Option[ZonedDateTime]): DynamoQuery[DR, Dec] = + expiration.fold(dynamoQuery)(withTtlField) def withTtlFieldOption( expirationEpochSeconds: Option[Long] )(implicit ev: DR <:< WithItem[DR] with WithTableReference[DR], @unused dummy: DummyImplicit, ): DynamoQuery[DR, Dec] = { - expirationEpochSeconds.fold(dynamoQuery)(withTtlField(_)) + expirationEpochSeconds.fold(dynamoQuery)(withTtlField) } - def withTtlField(expiration: ZonedDateTime)(implicit ev: DR <:< WithItem[DR] with WithTableReference[DR]): DynamoQuery[DR, Dec] = + def withTtlField(expiration: ZonedDateTime): DynamoQuery[DR, Dec] = withTtlField(expiration.toEpochSecond) - def withTtlField(expirationEpochSeconds: Long)(implicit ev: DR <:< WithItem[DR] with WithTableReference[DR]): DynamoQuery[DR, Dec] = { + def withTtlField(expirationEpochSeconds: Long): DynamoQuery[DR, Dec] = { dynamoQuery.modify { rq => rq.table.ttlField.fold(rq) { @@ -246,8 +279,9 @@ object DynamoQuery { } } - implicit final class TweakWithConsistent[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithConsistent[DR]) - extends WithConsistent[DynamoQuery[DR, Dec]] { + implicit final class TweakWithConsistent[DR <: DynamoRequest with WithConsistent[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithConsistent[DynamoQuery[DR, Dec]] { override def withConsistent(consistentRead: Boolean): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withConsistent(consistentRead)) } @@ -255,14 +289,17 @@ object DynamoQuery { def consistent: DynamoQuery[DR, Dec] = withConsistent(true) } - implicit final class TweakReturnValue[DR <: DynamoRequest, Dec](dynamoQuery: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithReturnValue[DR]) - extends WithReturnValue[DynamoQuery[DR, Dec]] { + implicit final class TweakReturnValue[DR <: DynamoRequest with WithReturnValue[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + ) extends WithReturnValue[DynamoQuery[DR, Dec]] { override def withReturnValue(returnValue: ReturnValue): DynamoQuery[DR, Dec] = { dynamoQuery.modify(_.withReturnValue(returnValue)) } } - implicit final class UpdateOps[Dec](private val dynamoQuery: DynamoQuery[UpdateTable, Dec]) extends AnyVal { + implicit final class UpdateOps[Dec]( + private val dynamoQuery: DynamoQuery[UpdateTable, Dec] + ) extends AnyVal { @inline def withNewProvisioning(provisioning: ProvisionedThroughputConfig): DynamoQuery[UpdateTable, Dec] = { dynamoQuery.modify(_.withNewProvisioning(provisioning)) } @@ -280,31 +317,52 @@ object DynamoQuery { } } - implicit final class QueryCount[DR <: DynamoRequest, Rb, Dec](private val dynamoQuery: DynamoQuery[DR, Dec]) extends AnyVal { - def countOnly(implicit ev1: DR <:< WithSelect[DR] with WithProjectionExpression[DR], ev3: DR#Rsp => { def count(): Integer }): DynamoQuery[DR, Int] = + implicit final class QueryCountOnly[DR <: DynamoRequest with WithSelect[DR] with WithProjectionExpression[DR], Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { + def countOnly(implicit ev: DR#Rsp => { def count(): Integer }): DynamoQuery[DR, Int] = dynamoQuery.modify(_.withSelect(Select.COUNT).withProjectionExpression(_ => None)).decode(_.count()) - def scannedCountOnly(implicit ev1: DR <:< WithSelect[DR] with WithProjectionExpression[DR], ev2: DR#Rsp => { def scannedCount(): Integer }): DynamoQuery[DR, Int] = + def scannedCountOnly(implicit ev: DR#Rsp => { def scannedCount(): Integer }): DynamoQuery[DR, Int] = dynamoQuery.modify(_.withSelect(Select.COUNT).withProjectionExpression(_ => None)).decode(_.scannedCount()) + } - def consumedCapacityOnly(implicit ev4: DR#Rsp => { def consumedCapacity(): ConsumedCapacity }): DynamoQuery[DR, ConsumedCapacity] = + implicit final class QueryCount[DR <: DynamoRequest, Dec]( + private val dynamoQuery: DynamoQuery[DR, Dec] + ) extends AnyVal { + def consumedCapacityOnly(implicit ev: DR#Rsp => { def consumedCapacity(): ConsumedCapacity }): DynamoQuery[DR, ConsumedCapacity] = dynamoQuery.decode(_.consumedCapacity()) - def withCount(implicit ev3: DR#Rsp => { def count(): Integer }): DynamoQuery[DR, (Dec, Int)] = + def withCount(implicit ev: DR#Rsp => { def count(): Integer }): DynamoQuery[DR, (Dec, Int)] = dynamoQuery.decodeWith((a, c) => (c, a.count())) - def withScannedCount(implicit ev2: DR#Rsp => { def scannedCount(): Integer }): DynamoQuery[DR, (Dec, Int)] = + def withScannedCount(implicit ev: DR#Rsp => { def scannedCount(): Integer }): DynamoQuery[DR, (Dec, Int)] = dynamoQuery.decodeWith((a, c) => (c, a.scannedCount())) - def withConsumedCapacity(implicit ev4: DR#Rsp => { def consumedCapacity(): ConsumedCapacity }): DynamoQuery[DR, (Dec, ConsumedCapacity)] = + def withConsumedCapacity(implicit ev: DR#Rsp => { def consumedCapacity(): ConsumedCapacity }): DynamoQuery[DR, (Dec, ConsumedCapacity)] = dynamoQuery.decodeWith((a, c) => (c, a.consumedCapacity())) } - implicit final class DecodeItems[DR <: DynamoRequest, Rb, Dec]( + implicit final class DecodeBatchedItems[DR <: DynamoRequest, Dec, A]( dynamoQuery: DynamoQuery[DR, Dec] - )(implicit - ev1: DR <:< WithProjectionExpression[DR] with WithTableReference[DR], - ev3: DR#Rsp => { def items(): java.util.List[java.util.Map[String, AttributeValue]] }, + )(implicit ev1: DR#Rsp <:< List[A], + ev2: A => { def responses(): java.util.Map[String, java.util.List[java.util.Map[String, AttributeValue]]] }, + ) { + + def decodeItems[Item: D4SDecoder]: DynamoQuery[DR, List[Item]] = { + dynamoQuery.decodeF(FnBIO { + rsp => implicit F => + import scala.jdk.CollectionConverters._ + + val itemsData = rsp.flatMap(_.responses().asScala.values.flatMap(_.asScala).toList) + F.traverse(itemsData)(decodeItemImpl(_)).map(_.flatten) + }) + } + } + + implicit final class DecodeItems[DR <: DynamoRequest with WithProjectionExpression[DR] with WithTableReference[DR], Dec]( + dynamoQuery: DynamoQuery[DR, Dec] + )(implicit ev: DR#Rsp => { def items(): java.util.List[java.util.Map[String, AttributeValue]] } ) { def decodeItems[Item: D4SDecoder: AttributeNames]: DynamoQuery[DR, List[Item]] = { dynamoQuery @@ -337,29 +395,21 @@ object DynamoQuery { } - implicit final class DecodeBatchedItems[DR <: DynamoRequest, Rb, Dec, A]( + implicit final class DecodeItemAttributes[DR <: DynamoRequest, Dec]( dynamoQuery: DynamoQuery[DR, Dec] - )(implicit - ev1: DR#Rsp <:< List[A], - ev2: A => { def responses(): java.util.Map[String, java.util.List[java.util.Map[String, AttributeValue]]] }, + )(implicit ev: DR#Rsp => { def attributes(): java.util.Map[String, AttributeValue] } ) { - - def decodeItems[Item: D4SDecoder]: DynamoQuery[DR, List[Item]] = { + def decodeItem[Item: D4SDecoder]: DynamoQuery[DR, Option[Item]] = { dynamoQuery.decodeF(FnBIO { - rsp => implicit F => - import scala.jdk.CollectionConverters._ - - val itemsData = rsp.flatMap(_.responses().asScala.values.flatMap(_.asScala).toList) - F.traverse(itemsData)(decodeItemImpl(_)).map(_.flatten) + response => implicit F => + decodeItemImpl(response.attributes()) }) } } - implicit final class DecodeItem[DR <: DynamoRequest, Rb, Dec]( + implicit final class DecodeItem[DR <: DynamoRequest with WithProjectionExpression[DR] with WithTableReference[DR], Dec]( dynamoQuery: DynamoQuery[DR, Dec] - )(implicit - ev1: DR <:< WithProjectionExpression[DR] with WithTableReference[DR], - ev2: DR#Rsp => { def item(): java.util.Map[String, AttributeValue] }, + )(implicit ev: DR#Rsp => { def item(): java.util.Map[String, AttributeValue] } ) { def decodeItem[Item: D4SDecoder: AttributeNames]: DynamoQuery[DR, Option[Item]] = { dynamoQuery @@ -385,11 +435,15 @@ object DynamoQuery { }) } - def decodeItemCheckTTL[Item: D4SDecoder: AttributeNames](now: ZonedDateTime): DynamoQuery[DR, Option[Item]] = { + def decodeItemCheckTTL[Item: D4SDecoder: AttributeNames]( + now: ZonedDateTime + ): DynamoQuery[DR, Option[Item]] = { decodeItemCheckTTL(now.toEpochSecond) } - def decodeItemCheckTTL[Item: D4SDecoder: AttributeNames](nowEpochSeconds: Long): DynamoQuery[DR, Option[Item]] = { + def decodeItemCheckTTL[Item: D4SDecoder: AttributeNames]( + nowEpochSeconds: Long + ): DynamoQuery[DR, Option[Item]] = { decodeItemWithTTL.decodeWith { case (_, Some((item, ttl))) if ttl.forall(_ >= nowEpochSeconds) => Some(item) case (_, _) => None @@ -397,21 +451,6 @@ object DynamoQuery { } } - implicit final class DecodeItemAttributes[DR <: DynamoRequest, Rb, Dec]( - dynamoQuery: DynamoQuery[DR, Dec] - )(implicit - ev: DR#Rsp => { - def attributes(): java.util.Map[String, AttributeValue] - } - ) { - def decodeItem[Item: D4SDecoder]: DynamoQuery[DR, Option[Item]] = { - dynamoQuery.decodeF(FnBIO { - response => implicit F => - decodeItemImpl(response.attributes()) - }) - } - } - @inline private[this] def decodeItemImpl[F[+_, +_]: BIOError, Item: D4SDecoder]( itemJavaMap: java.util.Map[String, AttributeValue] ): F[DecoderException, Option[Item]] = { diff --git a/d4s/src/main/scala/d4s/models/table/DynamoPrefixedSyntax.scala b/d4s/src/main/scala/d4s/models/table/DynamoPrefixedSyntax.scala index 237fe5a..02922fa 100644 --- a/d4s/src/main/scala/d4s/models/table/DynamoPrefixedSyntax.scala +++ b/d4s/src/main/scala/d4s/models/table/DynamoPrefixedSyntax.scala @@ -10,11 +10,11 @@ trait DynamoPrefixedSyntax { def ddl: TableDDL - implicit final def queryPrefixed[DR <: DynamoRequest, Dec](query: DynamoQuery[DR, Dec])(implicit ev: DR <:< WithTableReference[DR]): Prefixed[DR, Dec, Dec] = { + implicit final def queryPrefixed[DR <: DynamoRequest with WithTableReference[DR], Dec](query: DynamoQuery[DR, Dec]): Prefixed[DR, Dec, Dec] = { new Prefixed[DR, Dec, Dec](query) } - implicit final class Prefixed[DR <: DynamoRequest, Dec, A](exec: DynamoExecution[DR, Dec, A])(implicit ev: DR <:< WithTableReference[DR]) { + implicit final class Prefixed[DR <: DynamoRequest with WithTableReference[DR], Dec, A](private val exec: DynamoExecution[DR, Dec, A]) { def prefixed[TP: TablePrefix](prefix: TP): DynamoExecution[DR, Dec, A] = { exec.modify(_.withPrefix(prefix)).retryWithPrefix(ddl) } diff --git a/d4s/src/main/scala/d4s/models/table/index/ProvisionedGlobalIndex.scala b/d4s/src/main/scala/d4s/models/table/index/ProvisionedGlobalIndex.scala index 54e57d4..6395fec 100644 --- a/d4s/src/main/scala/d4s/models/table/index/ProvisionedGlobalIndex.scala +++ b/d4s/src/main/scala/d4s/models/table/index/ProvisionedGlobalIndex.scala @@ -8,7 +8,7 @@ final case class ProvisionedGlobalIndex[-H, -R]( name: String, key: DynamoKey[H, R], projection: Projection, - provisionedThroughputConfig: ProvisionedThroughputConfig + provisionedThroughputConfig: ProvisionedThroughputConfig, ) extends TableIndex[H, R] { def asCreateAction: CreateGlobalSecondaryIndexAction = { @@ -23,7 +23,7 @@ final case class ProvisionedGlobalIndex[-H, -R]( } object ProvisionedGlobalIndex { - implicit final class FromGlobalIndex[H, R](index: GlobalIndex[H, R]) { + implicit final class FromGlobalIndex[H, R](private val index: GlobalIndex[H, R]) extends AnyVal { def toProvisionedIndex(cfg: TableProvisionedThroughputConfig): ProvisionedGlobalIndex[H, R] = { ProvisionedGlobalIndex(index.name, index.key, index.projection, cfg.getIndexProvisioning(index.name)) } diff --git a/d4s/src/main/scala_2.12/d4s/compat/package.scala b/d4s/src/main/scala_2.12/d4s/compat/package.scala index a69c2cd..344346f 100644 --- a/d4s/src/main/scala_2.12/d4s/compat/package.scala +++ b/d4s/src/main/scala_2.12/d4s/compat/package.scala @@ -4,7 +4,7 @@ package object compat { private[d4s] object chaining { - implicit final class CainingOps[A](val a: A) extends AnyVal { + implicit final class ChainingOps[A](val a: A) extends AnyVal { def tap[U](f: A => U): A = { f(a) a