diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/TestDbLifecycle.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/TestDbLifecycle.scala index 56d9bd109..e7f050817 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/TestDbLifecycle.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/TestDbLifecycle.scala @@ -26,12 +26,14 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => lazy val r2dbcProjectionSettings: R2dbcProjectionSettings = R2dbcProjectionSettings(typedSystem.settings.config.getConfig(testConfigPath)) - lazy val r2dbcExecutor: R2dbcExecutor = { + lazy val r2dbcExecutor: R2dbcExecutor = new R2dbcExecutor( ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory), LoggerFactory.getLogger(getClass), - r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, typedSystem) - } + r2dbcProjectionSettings.logDbCallsExceeding, + ConnectionFactoryProvider(typedSystem) + .connectionPoolSettingsFor(r2dbcProjectionSettings.useConnectionFactory) + .closeCallsExceeding)(typedSystem.executionContext, typedSystem) lazy val persistenceExt: Persistence = Persistence(typedSystem) diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/TestDbLifecycle.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/TestDbLifecycle.scala index d80290c8a..107846ed0 100644 --- a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/TestDbLifecycle.scala +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/TestDbLifecycle.scala @@ -33,7 +33,10 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => new R2dbcExecutor( ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory), LoggerFactory.getLogger(getClass), - r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, typedSystem) + r2dbcProjectionSettings.logDbCallsExceeding, + ConnectionFactoryProvider(typedSystem) + .connectionPoolSettingsFor(r2dbcProjectionSettings.useConnectionFactory) + .closeCallsExceeding)(typedSystem.executionContext, typedSystem) } lazy val persistenceExt: Persistence = Persistence(typedSystem) diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index e514a1ee3..72556065f 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -64,7 +64,6 @@ import akka.projection.scaladsl.SourceProvider import akka.stream.RestartSettings import akka.stream.scaladsl.FlowWithContext import akka.stream.scaladsl.Source -import io.r2dbc.spi.ConnectionFactory import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -84,9 +83,7 @@ private[projection] object R2dbcProjectionImpl { projectionId: ProjectionId, sourceProvider: Option[BySlicesSourceProvider], settings: R2dbcProjectionSettings, - connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = { - val r2dbcExecutor = - new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(system.executionContext, system) + r2dbcExecutor: R2dbcExecutor)(implicit system: ActorSystem[_]) = { new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor) } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala index 7268d5f19..e78a0e357 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/scaladsl/R2dbcProjection.scala @@ -5,6 +5,7 @@ package akka.projection.r2dbc.scaladsl import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration import akka.Done import akka.actor.typed.ActorSystem @@ -51,15 +52,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) - val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)( - system.executionContext, - system) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForExactlyOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)( @@ -102,15 +105,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) - val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)( - system.executionContext, - system) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForAtLeastOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)( @@ -151,12 +156,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForAtLeastOnceAsync(sourceProvider, handler, offsetStore)( @@ -194,15 +204,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) - val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)( - system.executionContext, - system) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForGrouped(sourceProvider, handler, offsetStore, r2dbcExecutor)( @@ -244,12 +256,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForGroupedAsync(sourceProvider, handler, offsetStore)( @@ -299,12 +316,17 @@ object R2dbcProjection { val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system)) val connFactory = connectionFactory(system, r2dbcSettings) + val closeExceeding = closeCallsExceeding(system, r2dbcSettings) + val r2dbcExecutor = + new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)( + system.executionContext, + system) val offsetStore = R2dbcProjectionImpl.createOffsetStore( projectionId, timestampOffsetBySlicesSourceProvider(sourceProvider), r2dbcSettings, - connFactory) + r2dbcExecutor) val adaptedHandler = R2dbcProjectionImpl.adaptedHandlerForFlow(sourceProvider, handler, offsetStore, r2dbcSettings)(system) @@ -325,6 +347,12 @@ object R2dbcProjection { ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory) } + private def closeCallsExceeding( + system: ActorSystem[_], + r2dbcSettings: R2dbcProjectionSettings): Option[FiniteDuration] = { + ConnectionFactoryProvider(system).connectionPoolSettingsFor(r2dbcSettings.useConnectionFactory).closeCallsExceeding + } + private def timestampOffsetBySlicesSourceProvider( sourceProvider: SourceProvider[_, _]): Option[BySlicesSourceProvider] = { sourceProvider match { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6ebb755bf..82526fcee 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,7 +23,7 @@ object Dependencies { val akka = sys.props.getOrElse("build.akka.version", "2.8.4") val akkaPersistenceCassandra = "1.1.0" val akkaPersistenceJdbc = "5.2.0" - val akkaPersistenceR2dbc = "1.2.0-M3" + val akkaPersistenceR2dbc = "1.2.0-M4" val alpakka = "6.0.1" val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "4.0.2") val slick = "3.4.1" diff --git a/samples/grpc/iot-service-scala/build.sbt b/samples/grpc/iot-service-scala/build.sbt index 6b152e5c6..24ca6579f 100644 --- a/samples/grpc/iot-service-scala/build.sbt +++ b/samples/grpc/iot-service-scala/build.sbt @@ -29,7 +29,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.8.4" val AkkaHttpVersion = "10.5.2" val AkkaManagementVersion = "1.4.1" -val AkkaPersistenceR2dbcVersion = "1.2.0-M3" +val AkkaPersistenceR2dbcVersion = "1.2.0-M4" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M3") val AkkaDiagnosticsVersion = "2.0.1" diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index 843643e5e..b1710243f 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -2,8 +2,7 @@ name := "local-drone-control" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq( - ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) scalaVersion := "2.13.11" @@ -30,7 +29,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.8.4" val AkkaHttpVersion = "10.5.2" val AkkaManagementVersion = "1.4.0" -val AkkaPersistenceR2dbcVersion = "1.2.0-M3" +val AkkaPersistenceR2dbcVersion = "1.2.0-M4" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M3") val AkkaDiagnosticsVersion = "2.0.1" diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index e92ae50cf..5eb457531 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -2,8 +2,7 @@ name := "restaurant-drone-deliveries-service" organization := "com.lightbend.akka.samples" organizationHomepage := Some(url("https://akka.io")) -licenses := Seq( - ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) +licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) scalaVersion := "2.13.11" @@ -30,7 +29,7 @@ Global / cancelable := false // ctrl-c val AkkaVersion = "2.8.4" val AkkaHttpVersion = "10.5.2" val AkkaManagementVersion = "1.4.0" -val AkkaPersistenceR2dbcVersion = "1.2.0-M3" +val AkkaPersistenceR2dbcVersion = "1.2.0-M4" val AkkaProjectionVersion = sys.props.getOrElse("akka-projection.version", "1.5.0-M3") val AkkaDiagnosticsVersion = "2.0.1"