From 1da3efa75e762a7542b44b0c4e20509a4a6d51e4 Mon Sep 17 00:00:00 2001 From: Sebastian Alfers Date: Thu, 26 Sep 2024 14:28:19 +0200 Subject: [PATCH] chore: akka to 2.10.0-M1, align with changes from upstream (#1772) --- .scala-steward.conf | 1 - .../ReactiveKafkaConsumerBenchmarks.scala | 3 +- build.sbt | 12 +-- .../sharding/KafkaClusterSharding.scala | 13 ++- .../scala/akka/kafka/CommitterSettings.scala | 5 +- .../kafka/ConnectionCheckerSettings.scala | 6 +- .../scala/akka/kafka/ConsumerSettings.scala | 80 +++++++++---------- .../kafka/OffsetResetProtectionSettings.scala | 8 +- .../scala/akka/kafka/ProducerSettings.scala | 27 +++---- .../kafka/internal/CommittableSources.scala | 5 +- .../akka/kafka/internal/ConfigSettings.scala | 6 +- .../internal/ControlImplementations.scala | 17 ++-- .../kafka/internal/DeferredProducer.scala | 7 +- .../kafka/internal/KafkaConsumerActor.scala | 9 ++- .../akka/kafka/internal/MessageBuilder.scala | 6 +- .../internal/TransactionalProducerStage.scala | 6 +- .../kafka/internal/TransactionalSources.scala | 3 +- .../scala/akka/kafka/javadsl/Committer.scala | 6 +- .../scala/akka/kafka/javadsl/Consumer.scala | 14 ++-- .../akka/kafka/javadsl/DiscoverySupport.scala | 8 +- .../akka/kafka/javadsl/MetadataClient.scala | 38 ++++----- .../scala/akka/kafka/javadsl/Producer.scala | 8 +- .../akka/kafka/javadsl/SendProducer.scala | 8 +- .../akka/kafka/javadsl/Transactional.scala | 8 +- .../scala/akka/kafka/scaladsl/Committer.scala | 4 +- .../scala/akka/kafka/scaladsl/Consumer.scala | 11 ++- .../kafka/scaladsl/DiscoverySupport.scala | 4 +- .../akka/kafka/scaladsl/MetadataClient.scala | 13 ++- .../akka/kafka/scaladsl/SendProducer.scala | 5 +- .../kafka/testkit/javadsl/BaseKafkaTest.java | 4 +- .../KafkaTestkitTestcontainersSettings.scala | 14 ++-- .../internal/TestcontainersKafka.scala | 12 +-- .../akka/kafka/internal/ConsumerMock.scala | 4 +- .../akka/kafka/javadsl/ControlSpec.scala | 18 ++--- 34 files changed, 194 insertions(+), 199 deletions(-) diff --git a/.scala-steward.conf b/.scala-steward.conf index 5bfdb3611..aeb88fc0d 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -10,7 +10,6 @@ updates.pin = [ { groupId = "org.scalatest", artifactId = "scalatest", version = "3.1." } { groupId = "org.slf4j", artifactId = "log4j-over-slf4j", version = "1." } { groupId = "org.slf4j", artifactId = "jul-to-slf4j", version = "1." } - { groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.2." } ] commits.message = "bump: ${artifactName} ${nextVersion} (was ${currentVersion})" diff --git a/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala b/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala index 2524ce572..218d49193 100644 --- a/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala +++ b/benchmarks/src/main/scala/akka/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala @@ -8,7 +8,6 @@ package akka.kafka.benchmarks import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorSystem -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.CommittableMessage import akka.kafka.benchmarks.InflightMetrics.{BrokerMetricRequest, ConsumerMetricRequest} import akka.kafka.scaladsl.Committer @@ -163,7 +162,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging with InflightMetrics val control = fixture.source .mapAsync(1) { m => meter.mark() - m.committableOffset.commitInternal().map(_ => m)(ExecutionContexts.parasitic) + m.committableOffset.commitInternal().map(_ => m)(ExecutionContext.parasitic) } .toMat(Sink.foreach { msg => if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount - 1) diff --git a/build.sbt b/build.sbt index 07a936aa6..ca885ce46 100644 --- a/build.sbt +++ b/build.sbt @@ -16,8 +16,10 @@ val ScalaVersions = Seq(Scala213, Scala3) val Scala3Settings = Seq(crossScalaVersions := ScalaVersions) -val AkkaBinaryVersionForDocs = "2.9" -val akkaVersion = "2.9.3" +val akkaVersion = "2.10.0-M1" +val AkkaBinaryVersionForDocs = VersionNumber(akkaVersion).numbers match { + case Seq(major, minor, _*) => s"$major.$minor" +} // Keep .scala-steward.conf pin in sync val kafkaVersion = "3.7.1" @@ -26,7 +28,7 @@ val KafkaVersionForDocs = "37" // https://github.com/akka/akka/blob/main/project/Dependencies.scala#L44 val scalatestVersion = "3.2.16" val testcontainersVersion = "1.20.1" -val slf4jVersion = "1.7.36" +val slf4jVersion = "2.0.16" // this depends on Kafka, and should be upgraded to such latest version // that depends on the same Kafka version, as is defined above // See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages @@ -290,7 +292,7 @@ lazy val tests = project name := "akka-stream-kafka-tests", libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-discovery" % akkaVersion, - "com.google.protobuf" % "protobuf-java" % "3.25.4", // use the same, or later, version as in scalapb + "com.google.protobuf" % "protobuf-java" % "3.25.5", // use the same, or later, version as in scalapb "io.confluent" % "kafka-avro-serializer" % confluentAvroSerializerVersion % Test excludeAll (confluentLibsExclusionRules: _*), // See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808 "javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar"), @@ -304,7 +306,7 @@ lazy val tests = project "org.hamcrest" % "hamcrest" % "3.0" % Test, "net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test, - "ch.qos.logback" % "logback-classic" % "1.2.13" % Test, + "ch.qos.logback" % "logback-classic" % "1.5.7" % Test, "org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test, // Schema registry uses Glassfish which uses java.util.logging "org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test, diff --git a/cluster-sharding/src/main/scala/akka/kafka/cluster/sharding/KafkaClusterSharding.scala b/cluster-sharding/src/main/scala/akka/kafka/cluster/sharding/KafkaClusterSharding.scala index 0d96c9e51..1163b5d57 100644 --- a/cluster-sharding/src/main/scala/akka/kafka/cluster/sharding/KafkaClusterSharding.scala +++ b/cluster-sharding/src/main/scala/akka/kafka/cluster/sharding/KafkaClusterSharding.scala @@ -24,12 +24,11 @@ import org.apache.kafka.common.utils.Utils import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.jdk.DurationConverters._ +import scala.jdk.FutureConverters._ import scala.util.{Failure, Success} -import akka.util.JavaDurationConverters._ import org.slf4j.LoggerFactory -import scala.compat.java8.FutureConverters._ - /** * API MAY CHANGE * @@ -81,9 +80,9 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension def messageExtractor[M](topic: String, timeout: java.time.Duration, settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] = - getPartitionCount(topic, timeout.asScala, settings) + getPartitionCount(topic, timeout.toScala, settings) .map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher) - .toJava + .asJava /** * API MAY CHANGE @@ -147,11 +146,11 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension entityIdExtractor: java.util.function.Function[M, String], settings: ConsumerSettings[_, _] ): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] = - getPartitionCount(topic, timeout.asScala, settings) + getPartitionCount(topic, timeout.toScala, settings) .map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))( system.dispatcher ) - .toJava + .asJava /** * API MAY CHANGE diff --git a/core/src/main/scala/akka/kafka/CommitterSettings.scala b/core/src/main/scala/akka/kafka/CommitterSettings.scala index b8b50740c..953bfb9b3 100644 --- a/core/src/main/scala/akka/kafka/CommitterSettings.scala +++ b/core/src/main/scala/akka/kafka/CommitterSettings.scala @@ -7,10 +7,11 @@ package akka.kafka import java.util.concurrent.TimeUnit import akka.annotation.ApiMayChange -import akka.util.JavaDurationConverters._ + import com.typesafe.config.Config import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/882") sealed trait CommitDelivery @@ -176,7 +177,7 @@ class CommitterSettings private ( copy(maxInterval = maxInterval) def withMaxInterval(maxInterval: java.time.Duration): CommitterSettings = - copy(maxInterval = maxInterval.asScala) + copy(maxInterval = maxInterval.toScala) def withParallelism(parallelism: Int): CommitterSettings = copy(parallelism = parallelism) diff --git a/core/src/main/scala/akka/kafka/ConnectionCheckerSettings.scala b/core/src/main/scala/akka/kafka/ConnectionCheckerSettings.scala index c2d36c625..dd055d41a 100644 --- a/core/src/main/scala/akka/kafka/ConnectionCheckerSettings.scala +++ b/core/src/main/scala/akka/kafka/ConnectionCheckerSettings.scala @@ -5,10 +5,10 @@ package akka.kafka -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ import java.time.{Duration => JDuration} @@ -40,7 +40,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: Boolean, /** Java API */ def withCheckInterval(checkInterval: JDuration): ConnectionCheckerSettings = - copy(checkInterval = checkInterval.asScala) + copy(checkInterval = checkInterval.toScala) override def toString: String = s"akka.kafka.ConnectionCheckerSettings(" + @@ -70,7 +70,7 @@ object ConnectionCheckerSettings { if (enable) { val retries = config.getInt("max-retries") val factor = config.getDouble("backoff-factor") - val checkInterval = config.getDuration("check-interval").asScala + val checkInterval = config.getDuration("check-interval").toScala apply(retries, checkInterval, factor) } else Disabled } diff --git a/core/src/main/scala/akka/kafka/ConsumerSettings.scala b/core/src/main/scala/akka/kafka/ConsumerSettings.scala index 8db170de4..607934900 100644 --- a/core/src/main/scala/akka/kafka/ConsumerSettings.scala +++ b/core/src/main/scala/akka/kafka/ConsumerSettings.scala @@ -10,16 +10,16 @@ import java.util.concurrent.{CompletionStage, Executor} import akka.annotation.InternalApi import akka.kafka.internal._ -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.serialization.Deserializer -import scala.jdk.CollectionConverters._ -import scala.compat.java8.OptionConverters._ -import scala.compat.java8.FutureConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ object ConsumerSettings { @@ -74,25 +74,25 @@ object ConsumerSettings { (valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)), "Value deserializer should be defined or declared in configuration" ) - val pollInterval = config.getDuration("poll-interval").asScala - val pollTimeout = config.getDuration("poll-timeout").asScala - val stopTimeout = config.getDuration("stop-timeout").asScala - val closeTimeout = config.getDuration("close-timeout").asScala - val commitTimeout = config.getDuration("commit-timeout").asScala - val commitTimeWarning = config.getDuration("commit-time-warning").asScala + val pollInterval = config.getDuration("poll-interval").toScala + val pollTimeout = config.getDuration("poll-timeout").toScala + val stopTimeout = config.getDuration("stop-timeout").toScala + val closeTimeout = config.getDuration("close-timeout").toScala + val commitTimeout = config.getDuration("commit-timeout").toScala + val commitTimeWarning = config.getDuration("commit-time-warning").toScala val commitRefreshInterval = ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval") val dispatcher = config.getString("use-dispatcher") - val waitClosePartition = config.getDuration("wait-close-partition").asScala - val positionTimeout = config.getDuration("position-timeout").asScala - val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").asScala - val metadataRequestTimeout = config.getDuration("metadata-request-timeout").asScala - val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala + val waitClosePartition = config.getDuration("wait-close-partition").toScala + val positionTimeout = config.getDuration("position-timeout").toScala + val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").toScala + val metadataRequestTimeout = config.getDuration("metadata-request-timeout").toScala + val drainingCheckInterval = config.getDuration("eos-draining-check-interval").toScala val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath)) - val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala + val partitionHandlerWarning = config.getDuration("partition-handler-warning").toScala val resetProtectionThreshold = OffsetResetProtectionSettings( config.getConfig(OffsetResetProtectionSettings.configPath) ) - val consumerGroupUpdateInterval = config.getDuration("consumer-group-update-interval").asScala + val consumerGroupUpdateInterval = config.getDuration("consumer-group-update-interval").toScala new ConsumerSettings[K, V]( properties, @@ -168,7 +168,7 @@ object ConsumerSettings { keyDeserializer: Optional[Deserializer[K]], valueDeserializer: Optional[Deserializer[V]] ): ConsumerSettings[K, V] = - apply(system, keyDeserializer.asScala, valueDeserializer.asScala) + apply(system, keyDeserializer.toScala, valueDeserializer.toScala) /** * Java API: Create settings from the default configuration @@ -182,7 +182,7 @@ object ConsumerSettings { keyDeserializer: Optional[Deserializer[K]], valueDeserializer: Optional[Deserializer[V]] ): ConsumerSettings[K, V] = - apply(system, keyDeserializer.asScala, valueDeserializer.asScala) + apply(system, keyDeserializer.toScala, valueDeserializer.toScala) /** * Java API: Create settings from a configuration with the same layout as @@ -194,7 +194,7 @@ object ConsumerSettings { keyDeserializer: Optional[Deserializer[K]], valueDeserializer: Optional[Deserializer[V]] ): ConsumerSettings[K, V] = - apply(config, keyDeserializer.asScala, valueDeserializer.asScala) + apply(config, keyDeserializer.toScala, valueDeserializer.toScala) /** * Java API: Create settings from the default configuration @@ -369,7 +369,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * Set the maximum duration a poll to the Kafka broker is allowed to take. */ def withPollTimeout(pollTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(pollTimeout = pollTimeout.asScala) + copy(pollTimeout = pollTimeout.toScala) /** * Set the interval from one scheduled poll to the next. @@ -382,7 +382,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * Set the interval from one scheduled poll to the next. */ def withPollInterval(pollInterval: java.time.Duration): ConsumerSettings[K, V] = - copy(pollInterval = pollInterval.asScala) + copy(pollInterval = pollInterval.toScala) /** * The stage will await outstanding offset commit requests before @@ -399,7 +399,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * stop forcefully. */ def withStopTimeout(stopTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(stopTimeout = stopTimeout.asScala) + copy(stopTimeout = stopTimeout.toScala) /** * Set duration to wait for `KafkaConsumer.close` to finish. @@ -412,7 +412,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * Set duration to wait for `KafkaConsumer.close` to finish. */ def withCloseTimeout(closeTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(closeTimeout = closeTimeout.asScala) + copy(closeTimeout = closeTimeout.toScala) /** * If offset commit requests are not completed within this timeout @@ -427,7 +427,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * the returned Future is completed with [[akka.kafka.CommitTimeoutException]]. */ def withCommitTimeout(commitTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(commitTimeout = commitTimeout.asScala) + copy(commitTimeout = commitTimeout.toScala) /** * If commits take longer than this time a warning is logged @@ -440,7 +440,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * If commits take longer than this time a warning is logged */ def withCommitWarning(commitTimeWarning: java.time.Duration): ConsumerSettings[K, V] = - copy(commitTimeWarning = commitTimeWarning.asScala) + copy(commitTimeWarning = commitTimeWarning.toScala) /** * Fully qualified config path which holds the dispatcher configuration @@ -468,7 +468,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( */ def withCommitRefreshInterval(commitRefreshInterval: java.time.Duration): ConsumerSettings[K, V] = if (commitRefreshInterval.isZero) copy(commitRefreshInterval = Duration.Inf) - else copy(commitRefreshInterval = commitRefreshInterval.asScala) + else copy(commitRefreshInterval = commitRefreshInterval.toScala) /** * Time to wait for pending requests when a partition is closed. @@ -489,7 +489,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * Time to wait for pending requests when a partition is closed. */ def withWaitClosePartition(waitClosePartition: java.time.Duration): ConsumerSettings[K, V] = - copy(waitClosePartition = waitClosePartition.asScala) + copy(waitClosePartition = waitClosePartition.toScala) /** Scala API: Limits the blocking on Kafka consumer position calls. */ def withPositionTimeout(positionTimeout: FiniteDuration): ConsumerSettings[K, V] = @@ -497,7 +497,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( /** Java API: Limits the blocking on Kafka consumer position calls. */ def withPositionTimeout(positionTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(positionTimeout = positionTimeout.asScala) + copy(positionTimeout = positionTimeout.toScala) /** Scala API: Limits the blocking on Kafka consumer offsetForTimes calls. */ def withOffsetForTimesTimeout(offsetForTimesTimeout: FiniteDuration): ConsumerSettings[K, V] = @@ -505,7 +505,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( /** Java API: Limits the blocking on Kafka consumer offsetForTimes calls. */ def withOffsetForTimesTimeout(offsetForTimesTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(offsetForTimesTimeout = offsetForTimesTimeout.asScala) + copy(offsetForTimesTimeout = offsetForTimesTimeout.toScala) /** Scala API */ def withMetadataRequestTimeout(metadataRequestTimeout: FiniteDuration): ConsumerSettings[K, V] = @@ -513,7 +513,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( /** Java API */ def withMetadataRequestTimeout(metadataRequestTimeout: java.time.Duration): ConsumerSettings[K, V] = - copy(metadataRequestTimeout = metadataRequestTimeout.asScala) + copy(metadataRequestTimeout = metadataRequestTimeout.toScala) /** Scala API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer */ def withDrainingCheckInterval(drainingCheckInterval: FiniteDuration): ConsumerSettings[K, V] = @@ -521,7 +521,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( /** Java API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer */ def withDrainingCheckInterval(drainingCheckInterval: java.time.Duration): ConsumerSettings[K, V] = - copy(drainingCheckInterval = drainingCheckInterval.asScala) + copy(drainingCheckInterval = drainingCheckInterval.toScala) /** Scala API */ def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration): ConsumerSettings[K, V] = @@ -529,7 +529,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( /** Java API */ def withPartitionHandlerWarning(partitionHandlerWarning: java.time.Duration): ConsumerSettings[K, V] = - copy(partitionHandlerWarning = partitionHandlerWarning.asScala) + copy(partitionHandlerWarning = partitionHandlerWarning.toScala) /** * Scala API. @@ -547,7 +547,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( def withEnrichCompletionStage( value: java.util.function.Function[ConsumerSettings[K, V], CompletionStage[ConsumerSettings[K, V]]] ): ConsumerSettings[K, V] = - copy(enrichAsync = Some((s: ConsumerSettings[K, V]) => value.apply(s).toScala)) + copy(enrichAsync = Some((s: ConsumerSettings[K, V]) => value.apply(s).asScala)) /** * Replaces the default Kafka consumer creation logic. @@ -576,17 +576,17 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * of more work sending those updates. */ def withConsumerGroupUpdateInterval(interval: java.time.Duration): ConsumerSettings[K, V] = - copy(consumerGroupUpdateInterval = interval.asScala) + copy(consumerGroupUpdateInterval = interval.toScala) /** * Get the Kafka consumer settings as map. */ def getProperties: java.util.Map[String, AnyRef] = properties.asInstanceOf[Map[String, AnyRef]].asJava - def getCloseTimeout: java.time.Duration = closeTimeout.asJava - def getPositionTimeout: java.time.Duration = positionTimeout.asJava - def getOffsetForTimesTimeout: java.time.Duration = offsetForTimesTimeout.asJava - def getMetadataRequestTimeout: java.time.Duration = metadataRequestTimeout.asJava + def getCloseTimeout: java.time.Duration = closeTimeout.toJava + def getPositionTimeout: java.time.Duration = positionTimeout.toJava + def getOffsetForTimesTimeout: java.time.Duration = offsetForTimesTimeout.toJava + def getMetadataRequestTimeout: java.time.Duration = metadataRequestTimeout.toJava private def copy( properties: Map[String, String] = properties, @@ -675,7 +675,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] ( * (without blocking for `enriched`). */ def createKafkaConsumerCompletionStage(executor: Executor): CompletionStage[Consumer[K, V]] = - enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).toJava + enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).asJava private final val propertiesAllowList = Set( "auto.offset.reset", diff --git a/core/src/main/scala/akka/kafka/OffsetResetProtectionSettings.scala b/core/src/main/scala/akka/kafka/OffsetResetProtectionSettings.scala index c27bc1131..a96d3733c 100644 --- a/core/src/main/scala/akka/kafka/OffsetResetProtectionSettings.scala +++ b/core/src/main/scala/akka/kafka/OffsetResetProtectionSettings.scala @@ -7,10 +7,10 @@ package akka.kafka import java.time.{Duration => JDuration} import akka.annotation.InternalApi -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable: Boolean, val offsetThreshold: Long, @@ -50,7 +50,7 @@ class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable: Boo * If the record is more than this duration earlier the last received record, it is considered a reset */ def withTimeThreshold(timeThreshold: JDuration): OffsetResetProtectionSettings = - copy(timeThreshold = timeThreshold.asScala) + copy(timeThreshold = timeThreshold.toScala) override def toString: String = s"akka.kafka.OffsetResetProtectionSettings(" + @@ -79,7 +79,7 @@ object OffsetResetProtectionSettings { * threshold are considered indicative of an offset reset. */ def apply(offsetThreshold: Long, timeThreshold: java.time.Duration): OffsetResetProtectionSettings = - new OffsetResetProtectionSettings(true, offsetThreshold, timeThreshold.asScala) + new OffsetResetProtectionSettings(true, offsetThreshold, timeThreshold.toScala) /** * Create settings from a configuration with layout `connection-checker`. @@ -88,7 +88,7 @@ object OffsetResetProtectionSettings { val enable = config.getBoolean("enable") if (enable) { val offsetThreshold = config.getLong("offset-threshold") - val timeThreshold = config.getDuration("time-threshold").asScala + val timeThreshold = config.getDuration("time-threshold").toScala apply(offsetThreshold, timeThreshold) } else Disabled } diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index 0be9037e0..944eaf3df 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -14,13 +14,12 @@ import com.typesafe.config.Config import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig} import org.apache.kafka.common.serialization.Serializer -import scala.jdk.CollectionConverters._ -import scala.compat.java8.OptionConverters._ import scala.concurrent.duration._ -import akka.util.JavaDurationConverters._ - import scala.concurrent.{ExecutionContext, Future} -import scala.compat.java8.FutureConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ object ProducerSettings { @@ -73,11 +72,11 @@ object ProducerSettings { (valueSerializer.isDefined || properties.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)), "Value serializer should be defined or declared in configuration" ) - val closeTimeout = config.getDuration("close-timeout").asScala + val closeTimeout = config.getDuration("close-timeout").toScala val closeOnProducerStop = config.getBoolean("close-on-producer-stop") val parallelism = config.getInt("parallelism") val dispatcher = config.getString("use-dispatcher") - val eosCommitInterval = config.getDuration("eos-commit-interval").asScala + val eosCommitInterval = config.getDuration("eos-commit-interval").toScala val transactionIdPrefix = config.getString("transaction-id-prefix") new ProducerSettings[K, V]( properties, @@ -142,7 +141,7 @@ object ProducerSettings { keySerializer: Optional[Serializer[K]], valueSerializer: Optional[Serializer[V]] ): ProducerSettings[K, V] = - apply(system, keySerializer.asScala, valueSerializer.asScala) + apply(system, keySerializer.toScala, valueSerializer.toScala) /** * Java API: Create settings from the default configuration @@ -156,7 +155,7 @@ object ProducerSettings { keySerializer: Optional[Serializer[K]], valueSerializer: Optional[Serializer[V]] ): ProducerSettings[K, V] = - apply(system, keySerializer.asScala, valueSerializer.asScala) + apply(system, keySerializer.toScala, valueSerializer.toScala) /** * Java API: Create settings from a configuration with the same layout as @@ -168,7 +167,7 @@ object ProducerSettings { keySerializer: Optional[Serializer[K]], valueSerializer: Optional[Serializer[V]] ): ProducerSettings[K, V] = - apply(config, keySerializer.asScala, valueSerializer.asScala) + apply(config, keySerializer.toScala, valueSerializer.toScala) /** * Java API: Create settings from the default configuration @@ -305,7 +304,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( * Duration to wait for `KafkaProducer.close` to finish. */ def withCloseTimeout(closeTimeout: java.time.Duration): ProducerSettings[K, V] = - copy(closeTimeout = closeTimeout.asScala) + copy(closeTimeout = closeTimeout.toScala) /** * Call `KafkaProducer.close` on the [[org.apache.kafka.clients.producer.KafkaProducer]] when the producer stage @@ -340,7 +339,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( * The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`. */ def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] = - copy(eosCommitInterval = eosCommitInterval.asScala) + copy(eosCommitInterval = eosCommitInterval.toScala) /** * The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`. @@ -364,7 +363,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( def withEnrichCompletionStage( value: java.util.function.Function[ProducerSettings[K, V], CompletionStage[ProducerSettings[K, V]]] ): ProducerSettings[K, V] = - copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).toScala)) + copy(enrichAsync = Some((s: ProducerSettings[K, V]) => value.apply(s).asScala)) /** * Replaces the default Kafka producer creation logic with an external producer. This will also set @@ -497,5 +496,5 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( * @param executor Executor for asynchronous producer creation */ def createKafkaProducerCompletionStage(executor: Executor): CompletionStage[Producer[K, V]] = - createKafkaProducerAsync()(ExecutionContext.fromExecutor(executor)).toJava + createKafkaProducerAsync()(ExecutionContext.fromExecutor(executor)).asJava } diff --git a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala index b4bb243e3..3da2a8596 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittableSources.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittableSources.scala @@ -7,7 +7,6 @@ package akka.kafka.internal import akka.actor.ActorRef import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset} import akka.kafka._ import akka.kafka.internal.KafkaConsumerActor.Internal.{Commit, CommitSingle, CommitWithoutReply} @@ -151,7 +150,7 @@ private[kafka] object KafkaAsyncConsumerCommitterRef { } getFirstExecutionContext(batch) .map { implicit ec => - Future.sequence(futures).map(_ => Done)(ExecutionContexts.parasitic) + Future.sequence(futures).map(_ => Done)(ExecutionContext.parasitic) } .getOrElse(Future.successful(Done)) } @@ -210,7 +209,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A import akka.pattern.ask consumerActor .ask(msg)(Timeout(commitTimeout)) - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) .recoverWith { case e: AskTimeoutException => Future.failed(new CommitTimeoutException(s"Kafka commit took longer than: $commitTimeout (${e.getMessage})")) diff --git a/core/src/main/scala/akka/kafka/internal/ConfigSettings.scala b/core/src/main/scala/akka/kafka/internal/ConfigSettings.scala index cb4bdd9ef..c626d6801 100644 --- a/core/src/main/scala/akka/kafka/internal/ConfigSettings.scala +++ b/core/src/main/scala/akka/kafka/internal/ConfigSettings.scala @@ -11,9 +11,9 @@ import akka.annotation.InternalApi import com.typesafe.config.{Config, ConfigObject} import scala.annotation.tailrec -import scala.jdk.CollectionConverters._ import scala.concurrent.duration.Duration -import akka.util.JavaDurationConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ /** * INTERNAL API @@ -43,7 +43,7 @@ import akka.util.JavaDurationConverters._ def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration = underlying.getString(path) match { case "infinite" => Duration.Inf - case _ => underlying.getDuration(path).asScala + case _ => underlying.getDuration(path).toScala } } diff --git a/core/src/main/scala/akka/kafka/internal/ControlImplementations.scala b/core/src/main/scala/akka/kafka/internal/ControlImplementations.scala index 9b2e71f58..965bbab38 100644 --- a/core/src/main/scala/akka/kafka/internal/ControlImplementations.scala +++ b/core/src/main/scala/akka/kafka/internal/ControlImplementations.scala @@ -9,7 +9,6 @@ import java.util.concurrent.{CompletionStage, Executor} import akka.Done import akka.actor.ActorRef import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.internal.KafkaConsumerActor.Internal.{ConsumerMetrics, RequestMetrics} import akka.kafka.{javadsl, scaladsl} import akka.stream.SourceShape @@ -17,9 +16,9 @@ import akka.stream.stage.GraphStageLogic import akka.util.Timeout import org.apache.kafka.common.{Metric, MetricName} -import scala.jdk.CollectionConverters._ -import scala.compat.java8.FutureConverters.{CompletionStageOps, FutureOps} import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters.{CompletionStageOps, FutureOps} private object PromiseControl { sealed trait ControlOperation @@ -86,7 +85,7 @@ private trait MetricsControl extends scaladsl.Consumer.Control { consumer .ask(RequestMetrics)(Timeout(1.minute)) .mapTo[ConsumerMetrics] - .map(_.metrics)(ExecutionContexts.parasitic) + .map(_.metrics)(ExecutionContext.parasitic) }(executionContext) } } @@ -95,17 +94,17 @@ private trait MetricsControl extends scaladsl.Consumer.Control { @InternalApi final private[kafka] class ConsumerControlAsJava(underlying: scaladsl.Consumer.Control) extends javadsl.Consumer.Control { - override def stop(): CompletionStage[Done] = underlying.stop().toJava + override def stop(): CompletionStage[Done] = underlying.stop().asJava - override def shutdown(): CompletionStage[Done] = underlying.shutdown().toJava + override def shutdown(): CompletionStage[Done] = underlying.shutdown().asJava override def drainAndShutdown[T](streamCompletion: CompletionStage[T], ec: Executor): CompletionStage[T] = - underlying.drainAndShutdown(streamCompletion.toScala)(ExecutionContext.fromExecutor(ec)).toJava + underlying.drainAndShutdown(streamCompletion.asScala)(ExecutionContext.fromExecutor(ec)).asJava - override def isShutdown: CompletionStage[Done] = underlying.isShutdown.toJava + override def isShutdown: CompletionStage[Done] = underlying.isShutdown.asJava override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = - underlying.metrics.map(_.asJava)(ExecutionContexts.parasitic).toJava + underlying.metrics.map(_.asJava)(ExecutionContext.parasitic).asJava } /** Internal API */ diff --git a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala index e4bd2c9a8..208837d7f 100644 --- a/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala +++ b/core/src/main/scala/akka/kafka/internal/DeferredProducer.scala @@ -6,14 +6,13 @@ package akka.kafka.internal import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ProducerSettings import akka.stream.Materializer import akka.stream.stage._ -import akka.util.JavaDurationConverters._ import org.apache.kafka.clients.producer.Producer import scala.concurrent.ExecutionContext +import scala.jdk.DurationConverters._ import scala.util.control.NonFatal import scala.util.{Failure, Success} @@ -72,7 +71,7 @@ private[kafka] trait DeferredProducer[K, V] extends GraphStageLogic with StageId closeAndFailStageCb.invoke(e) e } - )(ExecutionContexts.parasitic) + )(ExecutionContext.parasitic) changeProducerAssignmentLifecycle(AsyncCreateRequestSent) } } @@ -95,7 +94,7 @@ private[kafka] trait DeferredProducer[K, V] extends GraphStageLogic with StageId try { // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case producer.flush() - producer.close(producerSettings.closeTimeout.asJava) + producer.close(producerSettings.closeTimeout.toJava) log.debug("Producer closed") } catch { case NonFatal(ex) => log.error(ex, "Problem occurred during producer close") diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 1c3ffd0b2..8419cb4fc 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -21,7 +21,6 @@ import akka.actor.{ Timers } import akka.annotation.InternalApi -import akka.util.JavaDurationConverters._ import akka.event.LoggingReceive import akka.kafka.KafkaConsumerActor.{StopLike, StoppingException} import akka.kafka._ @@ -34,9 +33,10 @@ import org.apache.kafka.common.errors.{ } import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import scala.annotation.nowarn -import scala.jdk.CollectionConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ import scala.util.{Success, Try} import scala.util.control.NonFatal @@ -434,7 +434,7 @@ import scala.util.control.NonFatal this.settings = updatedSettings if (settings.connectionCheckerSettings.enable) context.actorOf(ConnectionChecker.props(settings.connectionCheckerSettings)) - pollTimeout = settings.pollTimeout.asJava + pollTimeout = settings.pollTimeout.toJava offsetForTimesTimeout = settings.getOffsetForTimesTimeout positionTimeout = settings.getPositionTimeout val progressTrackingFactory: () => ConsumerProgressTracking = () => ensureProgressTracker() @@ -787,7 +787,8 @@ import scala.util.control.NonFatal partitionAssignmentHandler: PartitionAssignmentHandler ) extends RebalanceListener { - private val restrictedConsumer = new RestrictedConsumer(consumer, settings.partitionHandlerWarning.*(0.95d).asJava) + private val restrictedConsumer = + new RestrictedConsumer(consumer, java.time.Duration.ofNanos(settings.partitionHandlerWarning.*(0.95d).toNanos)) private val warningDuration = settings.partitionHandlerWarning.toNanos override def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit = { diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 8eccfed88..17006308a 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -20,9 +20,9 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetFetchResponse -import scala.compat.java8.FutureConverters.FutureOps import scala.concurrent.Future import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ /** Internal API */ @InternalApi @@ -146,7 +146,7 @@ private[kafka] trait OffsetContextBuilder[K, V] val committer: KafkaAsyncConsumerCommitterRef ) extends CommittableOffsetMetadata { override def commitScaladsl(): Future[Done] = commitInternal() - override def commitJavadsl(): CompletionStage[Done] = commitInternal().toJava + override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava override def commitInternal(): Future[Done] = KafkaAsyncConsumerCommitterRef.commit(this) override val batchSize: Long = 1 } @@ -248,7 +248,7 @@ private[kafka] final class CommittableOffsetBatchImpl( new CommittableOffsetBatchImpl(newOffsets, newCommitters, newOffsets.size.toLong) } - override def commitJavadsl(): CompletionStage[Done] = commitInternal().toJava + override def commitJavadsl(): CompletionStage[Done] = commitInternal().asJava /** * @return true if the batch contains no commits. diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 1739af8e4..4a6838047 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -8,7 +8,6 @@ package akka.kafka.internal import akka.Done import akka.actor.Terminated import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{GroupTopicPartition, PartitionOffsetCommittedMarker} import akka.kafka.ProducerMessage.{Envelope, Results} import akka.kafka.internal.DeferredProducer._ @@ -22,8 +21,9 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ProducerFencedException -import scala.concurrent.Future import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} @@ -198,7 +198,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( batchOffsets = TransactionBatch.empty batch .internalCommit() - .onComplete(onInternalCommitAckCb)(ExecutionContexts.parasitic) + .onComplete(onInternalCommitAckCb)(ExecutionContext.parasitic) } catch { case e: ProducerFencedException => log.debug(s"Producer fenced: $e") diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala index 5006d1a6f..2d3b675ff 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala @@ -10,7 +10,6 @@ import akka.{Done, NotUsed} import akka.actor.{ActorRef, Status, Terminated} import akka.actor.Status.Failure import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{PartitionOffset, TransactionalMessage} import akka.kafka.internal.KafkaConsumerActor.Internal.Revoked import akka.kafka.internal.SubSourceLogic._ @@ -317,7 +316,7 @@ private object TransactionalSourceLogic { import akka.pattern.ask sourceActor .ask(Committed(offsets))(Timeout(commitTimeout)) - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } override def failed(): Unit = diff --git a/core/src/main/scala/akka/kafka/javadsl/Committer.scala b/core/src/main/scala/akka/kafka/javadsl/Committer.scala index 3dae7799e..1bb8ea19d 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Committer.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Committer.scala @@ -13,7 +13,7 @@ import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch} import akka.kafka.{scaladsl, CommitterSettings} import akka.stream.javadsl.{Flow, FlowWithContext, Sink} -import scala.compat.java8.FutureConverters.FutureOps +import scala.jdk.FutureConverters._ object Committer { @@ -45,7 +45,7 @@ object Committer { * Batches offsets and commits them to Kafka. */ def sink[C <: Committable](settings: CommitterSettings): Sink[C, CompletionStage[Done]] = - scaladsl.Committer.sink(settings).mapMaterializedValue(_.toJava).asJava + scaladsl.Committer.sink(settings).mapMaterializedValue(_.asJava).asJava /** * API MAY CHANGE @@ -60,6 +60,6 @@ object Committer { .Flow[Pair[E, C]] .map(_.toScala) .toMat(scaladsl.Committer.sinkWithOffsetContext(settings))(akka.stream.scaladsl.Keep.right) - .mapMaterializedValue[CompletionStage[Done]](_.toJava) + .mapMaterializedValue[CompletionStage[Done]](_.asJava) .asJava[Pair[E, C]] } diff --git a/core/src/main/scala/akka/kafka/javadsl/Consumer.scala b/core/src/main/scala/akka/kafka/javadsl/Consumer.scala index 18fad2af9..f27dfcc3b 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Consumer.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Consumer.scala @@ -9,7 +9,6 @@ import java.util.concurrent.{CompletionStage, Executor} import akka.actor.ActorRef import akka.annotation.ApiMayChange -import akka.dispatch.ExecutionContexts import akka.japi.Pair import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset} import akka.kafka._ @@ -19,9 +18,10 @@ import akka.{Done, NotUsed} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.{Metric, MetricName, TopicPartition} -import scala.jdk.CollectionConverters._ -import scala.compat.java8.FutureConverters._ import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ /** * Akka Stream connector for subscribing to Kafka topics. @@ -285,7 +285,7 @@ object Consumer { settings, subscription, (tps: Set[TopicPartition]) => - getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic), + getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic), _ => () ) .map { @@ -316,7 +316,7 @@ object Consumer { settings, subscription, (tps: Set[TopicPartition]) => - getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic), + getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic), (tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava) ) .map { @@ -355,7 +355,7 @@ object Consumer { settings, subscription, (tps: Set[TopicPartition]) => - getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic), + getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic), _ => () ) .map { @@ -380,7 +380,7 @@ object Consumer { settings, subscription, (tps: Set[TopicPartition]) => - getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic), + getOffsetsOnAssign(tps.asJava).asScala.map(_.asScala.toMap)(ExecutionContext.parasitic), (tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava) ) .map { diff --git a/core/src/main/scala/akka/kafka/javadsl/DiscoverySupport.scala b/core/src/main/scala/akka/kafka/javadsl/DiscoverySupport.scala index 5a1bf3396..b2ef26cee 100644 --- a/core/src/main/scala/akka/kafka/javadsl/DiscoverySupport.scala +++ b/core/src/main/scala/akka/kafka/javadsl/DiscoverySupport.scala @@ -11,9 +11,9 @@ import akka.actor.{ActorSystem, ClassicActorSystemProvider} import akka.kafka.{scaladsl, ConsumerSettings, ProducerSettings} import com.typesafe.config.Config -import scala.compat.java8.FunctionConverters._ -import scala.compat.java8.FutureConverters import scala.concurrent.Future +import scala.jdk.FunctionConverters._ +import scala.jdk.FutureConverters._ /** * Scala API. @@ -33,7 +33,7 @@ object DiscoverySupport { implicit val sys: ClassicActorSystemProvider = system val function: ConsumerSettings[K, V] => Future[ConsumerSettings[K, V]] = scaladsl.DiscoverySupport.consumerBootstrapServers(config) - function.andThen(FutureConverters.toJava).asJava + function.andThen(_.asJava).asJava } // kept for bin-compatibility @@ -56,7 +56,7 @@ object DiscoverySupport { implicit val sys: ClassicActorSystemProvider = system val function: ProducerSettings[K, V] => Future[ProducerSettings[K, V]] = scaladsl.DiscoverySupport.producerBootstrapServers(config) - function.andThen(FutureConverters.toJava).asJava + function.andThen(_.asJava).asJava } // kept for bin-compatibility diff --git a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala index f2ffdf7b1..975b90bdb 100644 --- a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala @@ -8,15 +8,15 @@ package akka.kafka.javadsl import java.util.concurrent.{CompletionStage, Executor} import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerSettings import akka.util.Timeout import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{PartitionInfo, TopicPartition} -import scala.compat.java8.FutureConverters._ +import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContextExecutor import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient) { @@ -27,14 +27,14 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient .getBeginningOffsets(partitions.asScala.toSet) .map { beginningOffsets => beginningOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava - }(ExecutionContexts.parasitic) - .toJava + }(ExecutionContext.parasitic) + .asJava def getBeginningOffsetForPartition[K, V](partition: TopicPartition): CompletionStage[java.lang.Long] = metadataClient .getBeginningOffsetForPartition(partition) - .map(Long.box)(ExecutionContexts.parasitic) - .toJava + .map(Long.box)(ExecutionContext.parasitic) + .asJava def getEndOffsets( partitions: java.util.Set[TopicPartition] @@ -43,36 +43,36 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient .getEndOffsets(partitions.asScala.toSet) .map { endOffsets => endOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava - }(ExecutionContexts.parasitic) - .toJava + }(ExecutionContext.parasitic) + .asJava def getEndOffsetForPartition(partition: TopicPartition): CompletionStage[java.lang.Long] = metadataClient .getEndOffsetForPartition(partition) - .map(Long.box)(ExecutionContexts.parasitic) - .toJava + .map(Long.box)(ExecutionContext.parasitic) + .asJava def listTopics(): CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]] = metadataClient .listTopics() .map { topics => topics.view.iterator.map { case (k, v) => k -> v.asJava }.toMap.asJava - }(ExecutionContexts.parasitic) - .toJava + }(ExecutionContext.parasitic) + .asJava def getPartitionsFor(topic: java.lang.String): CompletionStage[java.util.List[PartitionInfo]] = metadataClient .getPartitionsFor(topic) .map { partitionsInfo => partitionsInfo.asJava - }(ExecutionContexts.parasitic) - .toJava + }(ExecutionContext.parasitic) + .asJava @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] = metadataClient .getCommittedOffset(partition) - .toJava + .asJava def getCommittedOffsets( partitions: java.util.Set[TopicPartition] @@ -81,8 +81,8 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient .getCommittedOffsets(partitions.asScala.toSet) .map { committedOffsets => committedOffsets.asJava - }(ExecutionContexts.parasitic) - .toJava + }(ExecutionContext.parasitic) + .asJava def close(): Unit = metadataClient.close() @@ -91,7 +91,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient object MetadataClient { def create(consumerActor: ActorRef, timeout: Timeout, executor: Executor): MetadataClient = { - implicit val ec: ExecutionContextExecutor = ExecutionContexts.fromExecutor(executor) + implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) val metadataClient = akka.kafka.scaladsl.MetadataClient.create(consumerActor, timeout) new MetadataClient(metadataClient) } @@ -101,7 +101,7 @@ object MetadataClient { system: ActorSystem, executor: Executor): MetadataClient = { val metadataClient = akka.kafka.scaladsl.MetadataClient - .create(consumerSettings, timeout)(system, ExecutionContexts.fromExecutor(executor)) + .create(consumerSettings, timeout)(system, ExecutionContext.fromExecutor(executor)) new MetadataClient(metadataClient) } } diff --git a/core/src/main/scala/akka/kafka/javadsl/Producer.scala b/core/src/main/scala/akka/kafka/javadsl/Producer.scala index 6d135ef32..706ac055f 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Producer.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Producer.scala @@ -15,7 +15,7 @@ import akka.{japi, Done, NotUsed} import org.apache.kafka.clients.producer.ProducerRecord import scala.annotation.nowarn -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ /** * Akka Stream connector for publishing messages to Kafka topics. @@ -31,7 +31,7 @@ object Producer { def plainSink[K, V](settings: ProducerSettings[K, V]): Sink[ProducerRecord[K, V], CompletionStage[Done]] = scaladsl.Producer .plainSink(settings) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava /** @@ -76,7 +76,7 @@ object Producer { @nowarn("cat=deprecation") val sink: Sink[IN, CompletionStage[Done]] = scaladsl.Producer .committableSink(settings) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava sink } @@ -130,7 +130,7 @@ object Producer { ): Sink[IN, CompletionStage[Done]] = scaladsl.Producer .committableSink(producerSettings, committerSettings) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava /** diff --git a/core/src/main/scala/akka/kafka/javadsl/SendProducer.scala b/core/src/main/scala/akka/kafka/javadsl/SendProducer.scala index dd2fabc93..26f1db6e8 100644 --- a/core/src/main/scala/akka/kafka/javadsl/SendProducer.scala +++ b/core/src/main/scala/akka/kafka/javadsl/SendProducer.scala @@ -13,7 +13,7 @@ import akka.kafka.ProducerMessage._ import akka.kafka.{scaladsl, ProducerSettings} import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ /** * Utility class for producing to Kafka without using Akka Streams. @@ -48,18 +48,18 @@ final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V]) * The messages support passing through arbitrary data. */ def sendEnvelope[PT](envelope: Envelope[K, V, PT]): CompletionStage[Results[K, V, PT]] = - underlying.sendEnvelope(envelope).toJava + underlying.sendEnvelope(envelope).asJava /** * Send a raw Kafka [[org.apache.kafka.clients.producer.ProducerRecord]] and complete a future with the resulting metadata. */ def send(record: ProducerRecord[K, V]): CompletionStage[RecordMetadata] = - underlying.send(record).toJava + underlying.send(record).asJava /** * Close the underlying producer (depending on the "close producer on stop" setting). */ - def close(): CompletionStage[Done] = underlying.close().toJava + def close(): CompletionStage[Done] = underlying.close().asJava override def toString: String = s"SendProducer(${underlying.settings})" } diff --git a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala index 6976a9d74..1996e0437 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Transactional.scala @@ -18,7 +18,7 @@ import akka.stream.javadsl._ import akka.{Done, NotUsed} import org.apache.kafka.clients.consumer.ConsumerRecord -import scala.compat.java8.FutureConverters.FutureOps +import scala.jdk.FutureConverters.FutureOps /** * Akka Stream connector to support transactions between Kafka topics. @@ -88,7 +88,7 @@ object Transactional { ): Sink[IN, CompletionStage[Done]] = scaladsl.Transactional .sink(settings) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava /** @@ -102,7 +102,7 @@ object Transactional { ): Sink[IN, CompletionStage[Done]] = scaladsl.Transactional .sink(settings, transactionalId) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava /** @@ -119,7 +119,7 @@ object Transactional { .Flow[Pair[Envelope[K, V, NotUsed], PartitionOffset]] .map(_.toScala) .toMat(scaladsl.Transactional.sinkWithOffsetContext(settings))(akka.stream.scaladsl.Keep.right) - .mapMaterializedValue(_.toJava) + .mapMaterializedValue(_.asJava) .asJava /** diff --git a/core/src/main/scala/akka/kafka/scaladsl/Committer.scala b/core/src/main/scala/akka/kafka/scaladsl/Committer.scala index 9209e9bc5..c749e9bbd 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Committer.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Committer.scala @@ -6,13 +6,13 @@ package akka.kafka.scaladsl import akka.annotation.ApiMayChange -import akka.dispatch.ExecutionContexts import akka.kafka.CommitterSettings import akka.kafka.ConsumerMessage.{Committable, CommittableOffsetBatch} import akka.kafka.internal.CommitCollectorStage import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink} import akka.{Done, NotUsed} +import scala.concurrent.ExecutionContext import scala.concurrent.Future object Committer { @@ -37,7 +37,7 @@ object Committer { case WaitForAck => offsetBatches .mapAsyncUnordered(settings.parallelism) { batch => - batch.commitInternal().map(_ => batch)(ExecutionContexts.parasitic) + batch.commitInternal().map(_ => batch)(ExecutionContext.parasitic) } case SendAndForget => offsetBatches.map(_.tellCommit()) diff --git a/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala b/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala index e0b6df54c..157c626fd 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala @@ -7,7 +7,6 @@ package akka.kafka.scaladsl import akka.actor.ActorRef import akka.annotation.ApiMayChange -import akka.dispatch.ExecutionContexts import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset} import akka.kafka._ import akka.kafka.internal._ @@ -104,8 +103,8 @@ object Consumer { override def shutdown(): Future[Done] = control .shutdown() - .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic) - .map(_ => Done)(ExecutionContexts.parasitic) + .flatMap(_ => streamCompletion)(ExecutionContext.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) override def drainAndShutdown[S](streamCompletion: Future[S])(implicit ec: ExecutionContext): Future[S] = control.drainAndShutdown(streamCompletion) @@ -119,8 +118,8 @@ object Consumer { override val isShutdown: Future[Done] = control.isShutdown - .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic) - .map(_ => Done)(ExecutionContexts.parasitic) + .flatMap(_ => streamCompletion)(ExecutionContext.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) override def metrics: Future[Map[MetricName, Metric]] = control.metrics } @@ -257,7 +256,7 @@ object Consumer { def atMostOnceSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[ConsumerRecord[K, V], Control] = committableSource[K, V](settings, subscription).mapAsync(1) { m => - m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.parasitic) + m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContext.parasitic) } /** diff --git a/core/src/main/scala/akka/kafka/scaladsl/DiscoverySupport.scala b/core/src/main/scala/akka/kafka/scaladsl/DiscoverySupport.scala index bc7bf656c..2525aab06 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/DiscoverySupport.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/DiscoverySupport.scala @@ -9,11 +9,11 @@ import akka.actor.{ActorSystem, ActorSystemImpl, ClassicActorSystemProvider} import akka.annotation.InternalApi import akka.discovery.{Discovery, ServiceDiscovery} import akka.kafka.{ConsumerSettings, ProducerSettings} -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import scala.util.Failure /** @@ -63,7 +63,7 @@ object DiscoverySupport { checkClassOrThrow(system.asInstanceOf[ActorSystemImpl]) val serviceName = config.getString("service-name") if (serviceName.nonEmpty) { - val lookupTimeout = config.getDuration("resolve-timeout").asScala + val lookupTimeout = config.getDuration("resolve-timeout").toScala bootstrapServers(discovery(config, system), serviceName, lookupTimeout) } else throw new IllegalArgumentException(s"value for `service-name` in $config is empty") } diff --git a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala index daa21f3e8..fdd7644c0 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala @@ -8,7 +8,6 @@ package akka.kafka.scaladsl import java.util.concurrent.atomic.AtomicLong import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} -import akka.dispatch.ExecutionContexts import akka.kafka.Metadata._ import akka.kafka.{ConsumerSettings, KafkaConsumerActor} import akka.pattern.ask @@ -30,7 +29,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) def getBeginningOffsetForPartition(partition: TopicPartition): Future[Long] = getBeginningOffsets(Set(partition)) @@ -43,7 +42,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) def getEndOffsetForPartition(partition: TopicPartition): Future[Long] = getEndOffsets(Set(partition)) @@ -56,7 +55,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) def getPartitionsFor(topic: String): Future[List[PartitionInfo]] = (consumerActor ? GetPartitionsFor(topic))(timeout) @@ -65,7 +64,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] = @@ -75,7 +74,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] = (consumerActor ? GetCommittedOffsets(partitions))(timeout) @@ -84,7 +83,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed .flatMap { case Success(res) => Future.successful(res) case Failure(e) => Future.failed(e) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) def close(): Unit = if (managedActor) { diff --git a/core/src/main/scala/akka/kafka/scaladsl/SendProducer.scala b/core/src/main/scala/akka/kafka/scaladsl/SendProducer.scala index 35a07ec06..c2795be1e 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/SendProducer.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/SendProducer.scala @@ -9,10 +9,11 @@ import akka.Done import akka.actor.{ActorSystem, ClassicActorSystemProvider} import akka.kafka.ProducerMessage._ import akka.kafka.ProducerSettings -import akka.util.JavaDurationConverters._ + import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.jdk.DurationConverters._ /** * Utility class for producing to Kafka without using Akka Streams. @@ -87,7 +88,7 @@ final class SendProducer[K, V] private (val settings: ProducerSettings[K, V], sy if (settings.closeProducerOnStop) producerFuture.map { producer => // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case producer.flush() - producer.close(settings.closeTimeout.asJava) + producer.close(settings.closeTimeout.toJava) Done } else Future.successful(Done) } diff --git a/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java b/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java index f7c539c2e..e778d9f64 100644 --- a/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java +++ b/testkit/src/main/java/akka/kafka/testkit/javadsl/BaseKafkaTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.functionConverterImpls.FromJavaPredicate; +import scala.jdk.FunctionWrappers.*; public abstract class BaseKafkaTest extends KafkaTestKitClass { @@ -123,7 +123,7 @@ public void waitUntilCluster(Predicate predicate) { settings().clusterTimeout(), settings().checkInterval(), adminClient(), - new FromJavaPredicate(predicate), + new FromJavaPredicate<>(predicate), log()); } diff --git a/testkit/src/main/scala/akka/kafka/testkit/KafkaTestkitTestcontainersSettings.scala b/testkit/src/main/scala/akka/kafka/testkit/KafkaTestkitTestcontainersSettings.scala index 528e699c0..1a3df9db2 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/KafkaTestkitTestcontainersSettings.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/KafkaTestkitTestcontainersSettings.scala @@ -10,11 +10,11 @@ import java.util.function.Consumer import akka.actor.ActorSystem import akka.kafka.testkit.internal.AlpakkaKafkaContainer -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import org.testcontainers.containers.GenericContainer import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ final class KafkaTestkitTestcontainersSettings private ( val zooKeeperImage: String, @@ -95,12 +95,12 @@ final class KafkaTestkitTestcontainersSettings private ( /** * Java Api */ - def getClusterStartTimeout(): Duration = clusterStartTimeout.asJava + def getClusterStartTimeout(): Duration = clusterStartTimeout.toJava /** * Java Api */ - def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.asJava + def getReadinessCheckTimeout(): Duration = readinessCheckTimeout.toJava /** * Sets the ZooKeeper image @@ -213,7 +213,7 @@ final class KafkaTestkitTestcontainersSettings private ( * Kafka cluster start up timeout */ def withClusterStartTimeout(timeout: Duration): KafkaTestkitTestcontainersSettings = - copy(clusterStartTimeout = timeout.asScala) + copy(clusterStartTimeout = timeout.toScala) /** * Kafka cluster readiness check timeout @@ -227,7 +227,7 @@ final class KafkaTestkitTestcontainersSettings private ( * Kafka cluster readiness check timeout */ def withReadinessCheckTimeout(timeout: Duration): KafkaTestkitTestcontainersSettings = - copy(readinessCheckTimeout = timeout.asScala) + copy(readinessCheckTimeout = timeout.toScala) private def copy( zooKeeperImage: String = zooKeeperImage, @@ -313,8 +313,8 @@ object KafkaTestkitTestcontainersSettings { val internalTopicsReplicationFactor = config.getInt("internal-topics-replication-factor") val useSchemaRegistry = config.getBoolean("use-schema-registry") val containerLogging = config.getBoolean("container-logging") - val clusterStartTimeout = config.getDuration("cluster-start-timeout").asScala - val readinessCheckTimeout = config.getDuration("readiness-check-timeout").asScala + val clusterStartTimeout = config.getDuration("cluster-start-timeout").toScala + val readinessCheckTimeout = config.getDuration("readiness-check-timeout").toScala new KafkaTestkitTestcontainersSettings(zooKeeperImage, zooKeeperImageTag, diff --git a/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala b/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala index 95179bda6..d0d204c95 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala @@ -7,12 +7,12 @@ package akka.kafka.testkit.internal import akka.kafka.testkit.KafkaTestkitTestcontainersSettings import akka.kafka.testkit.scaladsl.KafkaSpec -import akka.util.JavaDurationConverters._ import org.testcontainers.containers.GenericContainer import org.testcontainers.utility.DockerImageName -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.jdk.OptionConverters._ object TestcontainersKafka { trait Spec extends KafkaSpec { @@ -51,12 +51,12 @@ object TestcontainersKafka { def schemaRegistryContainer: Option[SchemaRegistryContainer] = { requireStarted() - cluster.getSchemaRegistry.asScala + cluster.getSchemaRegistry.toScala } def getSchemaRegistryUrl: String = { requireStarted() - cluster.getSchemaRegistry.asScala + cluster.getSchemaRegistry.toScala .map(_.getSchemaRegistryUrl) .getOrElse( throw new RuntimeException("Did you enable schema registry in your KafkaTestkitTestcontainersSettings?") @@ -77,8 +77,8 @@ object TestcontainersKafka { internalTopicsReplicationFactor, settings.useSchemaRegistry, settings.containerLogging, - settings.clusterStartTimeout.asJava, - settings.readinessCheckTimeout.asJava + settings.clusterStartTimeout.toJava, + settings.readinessCheckTimeout.toJava ) configureKafka(brokerContainers) configureKafkaConsumer.accept(brokerContainers.asJavaCollection) diff --git a/tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala b/tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala index 9edee9a0d..c5cb6c93f 100644 --- a/tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala +++ b/tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala @@ -8,7 +8,6 @@ package akka.kafka.internal import java.util.concurrent.atomic.AtomicBoolean import akka.testkit.TestKit -import akka.util.JavaDurationConverters._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.mockito.Mockito._ @@ -18,6 +17,7 @@ import org.mockito.verification.VerificationMode import org.mockito.{ArgumentMatchers, Mockito} import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ import scala.collection.immutable.Seq import scala.concurrent.duration._ @@ -161,7 +161,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock. } def verifyClosed(mode: VerificationMode = Mockito.times(1)): Unit = - verify(mock, mode).close(ConsumerMock.closeTimeout.asJava) + verify(mock, mode).close(ConsumerMock.closeTimeout.toJava) def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()): ConsumerRecords[K, V] = verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration]) diff --git a/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala b/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala index fb38b21b7..2fe160296 100644 --- a/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala +++ b/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala @@ -17,8 +17,8 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import scala.compat.java8.FutureConverters._ import scala.concurrent.Future +import scala.jdk.FutureConverters._ import scala.language.reflectiveCalls object ControlSpec { @@ -49,8 +49,8 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa "drain to stream result" in { val control = createControl() val drainingControl = - Consumer.createDrainingControl(control, Future.successful("expected").toJava) - drainingControl.drainAndShutdown(ec).toScala.futureValue should be("expected") + Consumer.createDrainingControl(control, Future.successful("expected").asJava) + drainingControl.drainAndShutdown(ec).asScala.futureValue should be("expected") control.shutdownCalled.get() should be(true) } @@ -59,9 +59,9 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = Consumer.createDrainingControl( control, - Future.failed[String](new RuntimeException("expected")).toJava + Future.failed[String](new RuntimeException("expected")).asJava ) - val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue + val value = drainingControl.drainAndShutdown(ec).asScala.failed.futureValue value shouldBe a[RuntimeException] value.getMessage should be("expected") control.shutdownCalled.get() should be(true) @@ -72,9 +72,9 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = Consumer.createDrainingControl( control, - Future.failed[String](new RuntimeException("expected")).toJava + Future.failed[String](new RuntimeException("expected")).asJava ) - val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue + val value = drainingControl.drainAndShutdown(ec).asScala.failed.futureValue value shouldBe a[RuntimeException] value.getMessage should be("expected") control.shutdownCalled.get() should be(true) @@ -83,8 +83,8 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa "drain to shutdown failure when stream succeeds" in { val control = createControl(shutdownFuture = Future.failed(new RuntimeException("expected"))) - val drainingControl = Consumer.createDrainingControl(control, Future.successful(Done).toJava) - val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue + val drainingControl = Consumer.createDrainingControl(control, Future.successful(Done).asJava) + val value = drainingControl.drainAndShutdown(ec).asScala.failed.futureValue value shouldBe a[RuntimeException] value.getMessage should be("expected") control.shutdownCalled.get() should be(true)