Skip to content

Commit

Permalink
bump: Akka Persistence R2DBC 1.2.0-M4 (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Aug 31, 2023
1 parent 87e49e3 commit 4410d9f
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
lazy val r2dbcProjectionSettings: R2dbcProjectionSettings =
R2dbcProjectionSettings(typedSystem.settings.config.getConfig(testConfigPath))

lazy val r2dbcExecutor: R2dbcExecutor = {
lazy val r2dbcExecutor: R2dbcExecutor =
new R2dbcExecutor(
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
LoggerFactory.getLogger(getClass),
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, typedSystem)
}
r2dbcProjectionSettings.logDbCallsExceeding,
ConnectionFactoryProvider(typedSystem)
.connectionPoolSettingsFor(r2dbcProjectionSettings.useConnectionFactory)
.closeCallsExceeding)(typedSystem.executionContext, typedSystem)

lazy val persistenceExt: Persistence = Persistence(typedSystem)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
new R2dbcExecutor(
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
LoggerFactory.getLogger(getClass),
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, typedSystem)
r2dbcProjectionSettings.logDbCallsExceeding,
ConnectionFactoryProvider(typedSystem)
.connectionPoolSettingsFor(r2dbcProjectionSettings.useConnectionFactory)
.closeCallsExceeding)(typedSystem.executionContext, typedSystem)
}

lazy val persistenceExt: Persistence = Persistence(typedSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import akka.projection.scaladsl.SourceProvider
import akka.stream.RestartSettings
import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Source
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -84,9 +83,7 @@ private[projection] object R2dbcProjectionImpl {
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
settings: R2dbcProjectionSettings,
connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = {
val r2dbcExecutor =
new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(system.executionContext, system)
r2dbcExecutor: R2dbcExecutor)(implicit system: ActorSystem[_]) = {
new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.projection.r2dbc.scaladsl

import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -51,15 +52,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)(
system.executionContext,
system)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForExactlyOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)(
Expand Down Expand Up @@ -102,15 +105,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)(
system.executionContext,
system)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForAtLeastOnce(sourceProvider, handler, offsetStore, r2dbcExecutor)(
Expand Down Expand Up @@ -151,12 +156,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForAtLeastOnceAsync(sourceProvider, handler, offsetStore)(
Expand Down Expand Up @@ -194,15 +204,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
val r2dbcExecutor = new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding)(
system.executionContext,
system)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForGrouped(sourceProvider, handler, offsetStore, r2dbcExecutor)(
Expand Down Expand Up @@ -244,12 +256,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForGroupedAsync(sourceProvider, handler, offsetStore)(
Expand Down Expand Up @@ -299,12 +316,17 @@ object R2dbcProjection {

val r2dbcSettings = settings.getOrElse(R2dbcProjectionSettings(system))
val connFactory = connectionFactory(system, r2dbcSettings)
val closeExceeding = closeCallsExceeding(system, r2dbcSettings)
val r2dbcExecutor =
new R2dbcExecutor(connFactory, R2dbcProjectionImpl.log, r2dbcSettings.logDbCallsExceeding, closeExceeding)(
system.executionContext,
system)
val offsetStore =
R2dbcProjectionImpl.createOffsetStore(
projectionId,
timestampOffsetBySlicesSourceProvider(sourceProvider),
r2dbcSettings,
connFactory)
r2dbcExecutor)

val adaptedHandler =
R2dbcProjectionImpl.adaptedHandlerForFlow(sourceProvider, handler, offsetStore, r2dbcSettings)(system)
Expand All @@ -325,6 +347,12 @@ object R2dbcProjection {
ConnectionFactoryProvider(system).connectionFactoryFor(r2dbcSettings.useConnectionFactory)
}

private def closeCallsExceeding(
system: ActorSystem[_],
r2dbcSettings: R2dbcProjectionSettings): Option[FiniteDuration] = {
ConnectionFactoryProvider(system).connectionPoolSettingsFor(r2dbcSettings.useConnectionFactory).closeCallsExceeding
}

private def timestampOffsetBySlicesSourceProvider(
sourceProvider: SourceProvider[_, _]): Option[BySlicesSourceProvider] = {
sourceProvider match {
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object Dependencies {
val akka = sys.props.getOrElse("build.akka.version", "2.8.4")
val akkaPersistenceCassandra = "1.1.0"
val akkaPersistenceJdbc = "5.2.0"
val akkaPersistenceR2dbc = "1.2.0-M3"
val akkaPersistenceR2dbc = "1.2.0-M4"
val alpakka = "6.0.1"
val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "4.0.2")
val slick = "3.4.1"
Expand Down
2 changes: 1 addition & 1 deletion samples/grpc/iot-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.8.4"
val AkkaHttpVersion = "10.5.2"
val AkkaManagementVersion = "1.4.1"
val AkkaPersistenceR2dbcVersion = "1.2.0-M3"
val AkkaPersistenceR2dbcVersion = "1.2.0-M4"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M3")
val AkkaDiagnosticsVersion = "2.0.1"
Expand Down
5 changes: 2 additions & 3 deletions samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name := "local-drone-control"

organization := "com.lightbend.akka.samples"
organizationHomepage := Some(url("https://akka.io"))
licenses := Seq(
("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))

scalaVersion := "2.13.11"

Expand All @@ -30,7 +29,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.8.4"
val AkkaHttpVersion = "10.5.2"
val AkkaManagementVersion = "1.4.0"
val AkkaPersistenceR2dbcVersion = "1.2.0-M3"
val AkkaPersistenceR2dbcVersion = "1.2.0-M4"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M3")
val AkkaDiagnosticsVersion = "2.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name := "restaurant-drone-deliveries-service"

organization := "com.lightbend.akka.samples"
organizationHomepage := Some(url("https://akka.io"))
licenses := Seq(
("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))

scalaVersion := "2.13.11"

Expand All @@ -30,7 +29,7 @@ Global / cancelable := false // ctrl-c
val AkkaVersion = "2.8.4"
val AkkaHttpVersion = "10.5.2"
val AkkaManagementVersion = "1.4.0"
val AkkaPersistenceR2dbcVersion = "1.2.0-M3"
val AkkaPersistenceR2dbcVersion = "1.2.0-M4"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.5.0-M3")
val AkkaDiagnosticsVersion = "2.0.1"
Expand Down

0 comments on commit 4410d9f

Please sign in to comment.