From 76979dbb8795a1c8643742b1674b6da879385422 Mon Sep 17 00:00:00 2001 From: jbwheatley Date: Wed, 2 Jun 2021 17:52:16 +0100 Subject: [PATCH] cross compilation to scala 3 (#573) * cross compilation to scala 3 * remove bm4 due to scala 3 --- build.sbt | 9 +- .../profunktor/fs2rabbit/javaConversion.scala | 6 +- .../fs2rabbit/algebra/Connection.scala | 4 +- .../fs2rabbit/effects/EnvelopeDecoder.scala | 25 +++-- .../interpreter/RabbitClientOps.scala | 3 +- .../dev/profunktor/fs2rabbit/model.scala | 6 +- .../program/AckConsumingProgram.scala | 2 +- .../fs2rabbit/program/AckingProgram.scala | 2 +- .../fs2rabbit/program/ConsumingProgram.scala | 2 +- .../fs2rabbit/program/PublishingProgram.scala | 2 +- .../fs2rabbit/AmqpFieldValueSpec.scala | 2 +- .../fs2rabbit/AmqpPropertiesSpec.scala | 2 +- .../examples/AckerConsumerDemo.scala | 7 +- .../examples/AutoAckConsumerDemo.scala | 4 +- .../examples/DropwizardMetricsDemo.scala | 7 +- project/Dependencies.scala | 6 +- site/docs/examples/client-metrics.md | 3 +- site/docs/examples/sample-acker.md | 3 +- .../fs2rabbit/interpreter/Fs2RabbitSpec.scala | 92 ++++++++++++------- 19 files changed, 106 insertions(+), 81 deletions(-) rename core/src/main/{scala-2.11 => scala-3}/dev/profunktor/fs2rabbit/javaConversion.scala (83%) diff --git a/build.sbt b/build.sbt index cd59177a..11d3ead8 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import microsites.ExtraMdFileConfig ThisBuild / name := """fs2-rabbit""" ThisBuild / scalaVersion := "2.13.5" -ThisBuild / crossScalaVersions := List("2.12.13", "2.13.5") +ThisBuild / crossScalaVersions := List("2.12.14", "2.13.5", "3.0.0") ThisBuild / organization := "dev.profunktor" ThisBuild / homepage := Some(url("https://fs2-rabbit.profunktor.dev/")) ThisBuild / licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")) @@ -28,7 +28,7 @@ promptTheme := PromptTheme( def scalaOptions(v: String) = CrossVersion.partialVersion(v) match { case Some((2, 13)) => List.empty[String] - case Some((3, _)) => List("-Ykind-projector") + case Some((3, _)) => List("-source:3.0-migration") case _ => List("-Xmax-classfile-name", "100") } @@ -45,8 +45,7 @@ def commonDependencies(v: String) = case Some((3, _)) => List.empty case _ => List( - compilerPlugin(Libraries.kindProjector), - compilerPlugin(Libraries.betterMonadicFor) + compilerPlugin(Libraries.kindProjector) ) }) @@ -58,7 +57,7 @@ val commonSettings = List( headerLicense := Some(HeaderLicense.ALv2(s"${startYear.value.get}-${java.time.Year.now}", "ProfunKtor")), Compile / doc / scalacOptions ++= List("-no-link-warnings"), scalacOptions ++= scalaOptions(scalaVersion.value), - scalacOptions --= List("-Wunused:params"), + scalacOptions --= List("-Wunused:params", "-Xfatal-warnings"), libraryDependencies ++= commonDependencies(scalaVersion.value), resolvers += "Apache public" at "https://repository.apache.org/content/groups/public/", scalafmtOnCompile := true, diff --git a/core/src/main/scala-2.11/dev/profunktor/fs2rabbit/javaConversion.scala b/core/src/main/scala-3/dev/profunktor/fs2rabbit/javaConversion.scala similarity index 83% rename from core/src/main/scala-2.11/dev/profunktor/fs2rabbit/javaConversion.scala rename to core/src/main/scala-3/dev/profunktor/fs2rabbit/javaConversion.scala index e4a4fd2f..59ac7118 100644 --- a/core/src/main/scala-2.11/dev/profunktor/fs2rabbit/javaConversion.scala +++ b/core/src/main/scala-3/dev/profunktor/fs2rabbit/javaConversion.scala @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 ProfunKtor + * Copyright 2017-2021 ProfunKtor * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,10 @@ package dev.profunktor.fs2rabbit -import scala.collection.convert.{DecorateAsJava, DecorateAsScala} +import scala.collection.convert.{AsJavaExtensions, AsScalaExtensions} // This exists purely for compatibility between Scala 2.13 and 2.12 since the // Java conversions have been moved into a different package between the two, // allowing us to have a single, consistent import everywhere else in this // codebase across both 2.13 and 2.12. -object javaConversion extends DecorateAsJava with DecorateAsScala +object javaConversion extends AsJavaExtensions with AsScalaExtensions diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala index ad1e9469..f02116c2 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala @@ -91,13 +91,13 @@ object ConnectionResource { Sync[F] .delay(connectionFactory.newConnection(addresses.toList.asJava)) .flatTap(c => Log[F].info(s"Acquired connection: $c")) - .map(RabbitConnection) + .map(RabbitConnection.apply) private[fs2rabbit] def acquireChannel(connection: AMQPConnection): F[AMQPChannel] = Sync[F] .delay(connection.value.createChannel) .flatTap(c => Log[F].info(s"Acquired channel: $c")) - .map(RabbitChannel) + .map(RabbitChannel.apply) override def createConnection: Resource[F, AMQPConnection] = Resource.make(acquireConnection) { amqpConn => diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala index 0e529b58..90ea7cba 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala @@ -15,7 +15,7 @@ */ package dev.profunktor.fs2rabbit.effects -import cats.{Applicative, ApplicativeError} +import cats.{Applicative, ApplicativeThrow} import cats.data.Kleisli import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, ExchangeName, RoutingKey} import dev.profunktor.fs2rabbit.model.AmqpFieldValue._ @@ -39,44 +39,43 @@ object EnvelopeDecoder { def redelivered[F[_]: Applicative]: EnvelopeDecoder[F, Boolean] = Kleisli(e => e.redelivered.pure[F]) - def header[F[_]](name: String)(implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, AmqpFieldValue] = + def header[F[_]](name: String)(implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, AmqpFieldValue] = Kleisli(e => F.catchNonFatal(e.properties.headers(name))) def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpFieldValue]] = Kleisli(_.properties.headers.get(name).pure[F]) - def stringHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, String] = + def stringHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, String] = headerPF[F, String](name) { case StringVal(a) => a } - def intHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, Int] = + def intHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Int] = headerPF[F, Int](name) { case IntVal(a) => a } - def longHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, Long] = + def longHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Long] = headerPF[F, Long](name) { case LongVal(a) => a } - def arrayHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, collection.Seq[Any]] = + def arrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, collection.Seq[Any]] = headerPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a } - def optStringHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, Option[String]] = + def optStringHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[String]] = optHeaderPF[F, String](name) { case StringVal(a) => a } - def optIntHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, Option[Int]] = + def optIntHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[Int]] = optHeaderPF[F, Int](name) { case IntVal(a) => a } - def optLongHeader[F[_]: ApplicativeError[*[_], Throwable]](name: String): EnvelopeDecoder[F, Option[Long]] = + def optLongHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[Long]] = optHeaderPF[F, Long](name) { case LongVal(a) => a } - def optArrayHeader[F[_]: ApplicativeError[*[_], Throwable]]( - name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] = + def optArrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] = optHeaderPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a } private def headerPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])( - implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, A] = + implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, A] = Kleisli { env => F.catchNonFatal(pf(env.properties.headers(name))) } private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])( - implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, Option[A]] = + implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, Option[A]] = Kleisli(_.properties.headers.get(name).traverse(h => F.catchNonFatal(pf(h)))) } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClientOps.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClientOps.scala index ffe4c5d1..b88f9d69 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClientOps.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClientOps.scala @@ -17,6 +17,7 @@ package dev.profunktor.fs2rabbit.interpreter import cats.data.EitherT +import cats.effect.MonadCancelThrow import cats.effect.kernel.MonadCancel import cats.~> import cats.implicits._ @@ -37,7 +38,7 @@ final class RabbitClientOps[F[_]](val client: RabbitClient[F]) extends AnyVal { /** * * Transforms the rabbit client into one where all errors from the effect are caught and lifted into EitherT's error channel */ - def liftAttemptK(implicit F: MonadCancel[F, Throwable]): RabbitClient[EitherT[F, Throwable, *]] = + def liftAttemptK(implicit F: MonadCancelThrow[F]): RabbitClient[EitherT[F, Throwable, *]] = imapK[EitherT[F, Throwable, *]](EitherT.liftAttemptK)( new (EitherT[F, Throwable, *] ~> F) { def apply[A](fa: EitherT[F, Throwable, A]): F[A] = fa.value.flatMap { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala index 05587718..a1423529 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala @@ -64,7 +64,7 @@ object model { object DeliveryTag extends (Long => DeliveryTag) { implicit val deliveryTagOrder: Order[DeliveryTag] = Order.by(_.value) implicit val deliveryTagCommutativeSemigroup: CommutativeSemigroup[DeliveryTag] = - CommutativeSemigroup.instance(_ max _) + CommutativeSemigroup.instance(deliveryTagOrder.max) } case class ConsumerTag(value: String) extends AnyVal object ConsumerTag extends (String => ConsumerTag) { @@ -484,12 +484,12 @@ object model { case class AmqpMessage[A](payload: A, properties: AmqpProperties) object AmqpEnvelope { - private def encoding[F[_]](implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, Option[Charset]] = + private def encoding[F[_]](implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, Option[Charset]] = Kleisli(_.properties.contentEncoding.traverse(n => F.catchNonFatal(Charset.forName(n)))) // usually this would go in the EnvelopeDecoder companion object, but since that's only a type alias, // we need to put it here for the compiler to find it during implicit search - implicit def stringDecoder[F[_]: ApplicativeError[*[_], Throwable]]: EnvelopeDecoder[F, String] = + implicit def stringDecoder[F[_]: ApplicativeThrow]: EnvelopeDecoder[F, String] = (EnvelopeDecoder.payload[F], encoding[F]).mapN((p, e) => new String(p, e.getOrElse(UTF_8))) implicit val amqpEnvelopeTraverse: Traverse[AmqpEnvelope] = new Traverse[AmqpEnvelope] { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala index 277ab374..7814df71 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala @@ -48,7 +48,7 @@ trait AckConsumingProgram[F[_]] with ConsumingStream[F] with Cancel[F] -case class WrapperAckConsumingProgram[F[_]: Sync] private ( +case class WrapperAckConsumingProgram[F[_]: Sync] private[program] ( ackingProgram: AckingProgram[F], consumingProgram: ConsumingProgram[F] ) extends AckConsumingProgram[F] { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala index 9903128f..f747d310 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala @@ -32,7 +32,7 @@ object AckingProgram { trait AckingProgram[F[_]] extends Acking[F] with Consume[F] -case class WrapperAckingProgram[F[_]: Sync] private ( +case class WrapperAckingProgram[F[_]: Sync] private[program] ( config: Fs2RabbitConfig, consume: Consume[F] ) extends AckingProgram[F] { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala index 712fd75a..58a7f7c3 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala @@ -32,7 +32,7 @@ object ConsumingProgram { trait ConsumingProgram[F[_]] extends ConsumingStream[F] with Consume[F] -case class WrapperConsumingProgram[F[_]: Sync] private ( +case class WrapperConsumingProgram[F[_]: Sync] private[program] ( internalQueue: InternalQueue[F], consume: Consume[F] ) extends ConsumingProgram[F] { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala index 0d05e6b6..df4b14b1 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala @@ -34,7 +34,7 @@ object PublishingProgram { trait PublishingProgram[F[_]] extends Publishing[F] with Publish[F] -case class WrapperPublishingProgram[F[_]: Sync] private ( +case class WrapperPublishingProgram[F[_]: Sync] private[program] ( publish: Publish[F] ) extends PublishingProgram[F] { override def createPublisher[A]( diff --git a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala index f7593363..3d89f85c 100644 --- a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala +++ b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala @@ -41,7 +41,7 @@ class AmqpFieldValueSpec extends AnyFlatSpecLike with Matchers with AmqpProperti AmqpFieldValue.unsafeFrom(arrayVal.toValueWriterCompatibleJava) should be(arrayVal) } it should "preserve the same value after a round-trip through impure and from" in { - forAll { amqpHeaderVal: AmqpFieldValue => + forAll { (amqpHeaderVal: AmqpFieldValue) => AmqpFieldValue.unsafeFrom(amqpHeaderVal.toValueWriterCompatibleJava) == amqpHeaderVal } } diff --git a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala index 3af8806c..ee3a8449 100644 --- a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala +++ b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala @@ -31,7 +31,7 @@ import scodec.bits.ByteVector class AmqpPropertiesSpec extends AnyFlatSpecLike with Matchers with AmqpPropertiesArbitraries { it should s"convert from and to Java AMQP.BasicProperties" in { - forAll { amqpProperties: AmqpProperties => + forAll { (amqpProperties: AmqpProperties) => val basicProps = amqpProperties.asBasicProps AmqpProperties.unsafeFrom(basicProps) should be(amqpProperties) } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala index f118448d..e5c9f49a 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala @@ -17,11 +17,11 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 - import cats.data.Kleisli import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} +import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.interpreter.RabbitClient import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack @@ -35,7 +35,7 @@ class AckerConsumerDemo[F[_]: Async](fs2Rabbit: RabbitClient[F]) { private val exchangeName = ExchangeName("testEX") private val routingKey = RoutingKey("testRK") - implicit val stringMessageEncoder = + implicit val stringMessageEncoder: MessageEncoder[F, AmqpMessage[String]] = Kleisli[F, AmqpMessage[String], AmqpMessage[Array[Byte]]](s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[F]) def logPipe: Pipe[F, AmqpEnvelope[String], AckResult] = @@ -55,7 +55,8 @@ class AckerConsumerDemo[F[_]: Async](fs2Rabbit: RabbitClient[F]) { _ <- fs2Rabbit.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- fs2Rabbit.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic)) _ <- fs2Rabbit.bindQueue(queueName, exchangeName, routingKey) - (acker, consumer) <- fs2Rabbit.createAckerConsumer[String](queueName) + ackerConsumer <- fs2Rabbit.createAckerConsumer[String](queueName) + (acker, consumer) = ackerConsumer publisher <- fs2Rabbit.createPublisherWithListener[AmqpMessage[String]]( exchangeName, routingKey, diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala index ab40edad..cefd6293 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala @@ -17,11 +17,11 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 - import cats.data.Kleisli import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig +import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.interpreter.RabbitClient import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack @@ -34,7 +34,7 @@ class AutoAckConsumerDemo[F[_]: Async](R: RabbitClient[F]) { private val queueName = QueueName("testQ") private val exchangeName = ExchangeName("testEX") private val routingKey = RoutingKey("testRK") - implicit val stringMessageEncoder = + implicit val stringMessageEncoder: MessageEncoder[F, AmqpMessage[String]] = Kleisli[F, AmqpMessage[String], AmqpMessage[Array[Byte]]](s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[F]) def logPipe: Pipe[F, AmqpEnvelope[String], AckResult] = _.evalMap { amqpMsg => diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala index f33e9154..5865fde9 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala @@ -17,7 +17,6 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 - import cats.data.{Kleisli, NonEmptyList} import cats.effect.{IO, IOApp, Resource, Sync} import cats.implicits._ @@ -26,6 +25,7 @@ import com.codahale.metrics.jmx.JmxReporter import com.rabbitmq.client.impl.StandardMetricsCollector import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} +import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.interpreter.RabbitClient import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.ExchangeType.Topic @@ -56,7 +56,7 @@ object DropwizardMetricsDemo extends IOApp.Simple { val simpleMessage = AmqpMessage("Hey!", AmqpProperties.empty) - implicit val stringMessageEncoder = + implicit val stringMessageEncoder: MessageEncoder[IO, AmqpMessage[String]] = Kleisli[IO, AmqpMessage[String], AmqpMessage[Array[Byte]]] { s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[IO] } @@ -80,7 +80,8 @@ object DropwizardMetricsDemo extends IOApp.Simple { _ <- client.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- client.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic)) _ <- client.bindQueue(queueName, exchangeName, routingKey) - (acker, consumer) <- client.createAckerConsumer[String](queueName) + ackerConsumer <- client.createAckerConsumer[String](queueName) + (acker, consumer) = ackerConsumer publisher <- client.createPublisher[AmqpMessage[String]](exchangeName, routingKey) } yield (consumer, acker, publisher) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 52f0122c..37b71da7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,11 +16,10 @@ object Dependencies { val dropwizard = "4.2.0" val kindProjector = "0.13.0" - val betterMonadicFor = "0.3.1" val scalaTest = "3.2.9" val scalaCheck = "1.15.4" - val scalaTestPlusScalaCheck = "3.2.2.0" + val scalaTestPlusScalaCheck = "3.2.9.0" val disciplineScalaCheck = "2.1.5" } @@ -34,7 +33,6 @@ object Dependencies { // Compiler lazy val kindProjector = "org.typelevel" % "kind-projector" % Version.kindProjector cross CrossVersion.full - lazy val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % Version.betterMonadicFor // Examples lazy val logback = "ch.qos.logback" % "logback-classic" % Version.logback @@ -52,7 +50,7 @@ object Dependencies { // Scala test libraries lazy val scalaTest = "org.scalatest" %% "scalatest" % Version.scalaTest lazy val scalaCheck = "org.scalacheck" %% "scalacheck" % Version.scalaCheck - lazy val scalaTestPlusScalaCheck = "org.scalatestplus" %% "scalacheck-1-14" % Version.scalaTestPlusScalaCheck + lazy val scalaTestPlusScalaCheck = "org.scalatestplus" %% "scalacheck-1-15" % Version.scalaTestPlusScalaCheck lazy val disciplineScalaCheck = "org.typelevel" %% "discipline-scalatest" % Version.disciplineScalaCheck lazy val catsLaws = "org.typelevel" %% "cats-laws" % Version.cats lazy val catsKernelLaws = "org.typelevel" %% "cats-kernel-laws" % Version.cats diff --git a/site/docs/examples/client-metrics.md b/site/docs/examples/client-metrics.md index 998e479e..2a073a54 100644 --- a/site/docs/examples/client-metrics.md +++ b/site/docs/examples/client-metrics.md @@ -142,7 +142,8 @@ object DropwizardMetricsDemo extends IOApp { _ <- client.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- client.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic)) _ <- client.bindQueue(queueName, exchangeName, routingKey) - (acker, consumer) <- client.createAckerConsumer[String](queueName) + ackerConsumer <- client.createAckerConsumer[String](queueName) + (acker, consumer) = ackerConsumer publisher <- client.createPublisher[AmqpMessage[String]](exchangeName, routingKey) } yield (consumer, acker, publisher) diff --git a/site/docs/examples/sample-acker.md b/site/docs/examples/sample-acker.md index 86fd914c..841c59ee 100644 --- a/site/docs/examples/sample-acker.md +++ b/site/docs/examples/sample-acker.md @@ -78,7 +78,8 @@ class AckerConsumerDemo[F[_]: Async](R: RabbitClient[F]) { routingKey, publishingFlag, publishingListener) - (acker, consumer) <- R.createAckerConsumer[String](queueName) + ackerConsumer <- R.createAckerConsumer[String](queueName) + (acker, consumer) = ackerConsumer result = new Flow[F, String](consumer, acker, logPipe, publisher).flow _ <- result.compile.drain } yield () diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index d809900e..46479178 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -175,7 +175,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) @@ -190,13 +191,15 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) publisher <- createPublisher[String](x, rk) _ <- publisher("acker-test") - (acker, consumer) <- createAckerConsumer(q) + ackerConsumer <- createAckerConsumer(q) + (acker, consumer) = ackerConsumer _ <- consumer .take(1) .evalMap { msg => @@ -215,13 +218,15 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) publisher <- createPublisher[String](x, rk) _ <- publisher("acker-test") - (acker, consumer) <- createAckerConsumerWithMultipleFlag(q) + ackerConsumer <- createAckerConsumerWithMultipleFlag(q) + (acker, consumer) = ackerConsumer _ <- consumer .take(1) .evalMap { msg => @@ -239,13 +244,15 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) publisher <- createPublisher[String](x, rk) _ <- publisher("NAck-test") - (acker, consumer) <- createAckerConsumer(q) + ackerConsumer <- createAckerConsumer(q) + (acker, consumer) = ackerConsumer _ <- consumer .take(1) .evalMap { msg => @@ -264,13 +271,15 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) publisher <- createPublisher[String](x, rk) _ <- publisher("Reject-test") - (acker, consumer) <- createAckerConsumer(q) + ackerConsumer <- createAckerConsumer(q) + (acker, consumer) = ackerConsumer _ <- consumer .take(1) .evalMap { msg => @@ -288,7 +297,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk) @@ -311,7 +321,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk ct <- mkRandomString.map(ConsumerTag) _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) @@ -336,19 +347,21 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk ct <- mkRandomString.map(ConsumerTag) _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk) publisher <- createPublisher[String](x, rk) consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer( - q, - BasicQos(prefetchSize = 0, prefetchCount = 10), - Some(consumerArgs) - ) - _ <- publisher("test") + ackerConsumer <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + (acker, consumer) = ackerConsumer + _ <- publisher("test") _ <- consumer .take(1) .evalTap { msg => @@ -381,7 +394,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, _) <- randomQueueData + qxrk <- randomQueueData + (q, x, _) = qxrk _ <- declareExchange(x, ExchangeType.Direct) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- deleteQueue(DeletionQueueConfig.default(q)) @@ -448,7 +462,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, _, rk) <- randomQueueData + qxrk <- randomQueueData + (q, _, rk) = qxrk sourceExchange <- mkRandomString.map(ExchangeName) destExchange <- mkRandomString.map(ExchangeName) consumerTag <- mkRandomString.map(ConsumerTag) @@ -459,12 +474,13 @@ trait Fs2RabbitSpec { self: BaseSpec => _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) publisher <- createPublisher[String](sourceExchange, rk) consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer( - q, - BasicQos(prefetchSize = 0, prefetchCount = 10), - Some(consumerArgs) - ) - _ <- publisher("test") + ackerConsumer <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + (acker, consumer) = ackerConsumer + _ <- publisher("test") _ <- consumer .take(1) .evalTap { msg => @@ -499,13 +515,15 @@ trait Fs2RabbitSpec { self: BaseSpec => Stream.resource(createConnectionChannel).flatMap { implicit channel => for { - (q, x, rk) <- Stream.eval(randomQueueData) + qxrk <- Stream.eval(randomQueueData) + (q, x, rk) = qxrk _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) publisher <- Stream.eval(createPublisher[String](x, rk)) _ <- Stream.eval(publisher("NAck-test")) - (acker, consumer) <- Stream.eval(createAckerConsumer(q)) + ackerConsumer <- Stream.eval(createAckerConsumer(q)) + (acker, consumer) = ackerConsumer result <- Stream.eval(consumer.take(1).compile.lastOrError) _ <- Stream.eval(acker(NAck(result.deliveryTag))) consumer <- Stream.eval(createAutoAckConsumer(q)) @@ -525,13 +543,15 @@ trait Fs2RabbitSpec { self: BaseSpec => Stream.resource(createConnectionChannel).flatMap { implicit channel => for { - (q, x, rk) <- Stream.eval(randomQueueData) + qxrk <- Stream.eval(randomQueueData) + (q, x, rk) = qxrk _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) publisher <- Stream.eval(createPublisher[String](x, rk)) _ <- Stream.eval(publisher("Reject-test")) - (acker, consumer) <- Stream.eval(createAckerConsumer(q)) + ackerConsumer <- Stream.eval(createAckerConsumer(q)) + (acker, consumer) = ackerConsumer result <- Stream.eval(consumer.take(1).compile.lastOrError) _ <- Stream.eval(acker(Reject(result.deliveryTag))) consumer <- Stream.eval(createAutoAckConsumer(q)) @@ -551,7 +571,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk diffQ <- mkRandomString.map(QueueName) _ <- declareExchange(x, ExchangeType.Topic) publisher <- createPublisher[String](x, rk) @@ -588,7 +609,8 @@ trait Fs2RabbitSpec { self: BaseSpec => Stream.resource(createConnectionChannel).flatMap { implicit channel => for { - (q, x, rk) <- Stream.eval(randomQueueData) + qxrk <- Stream.eval(randomQueueData) + (q, x, rk) = qxrk _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) _ <- Stream.eval(bindQueue(q, x, rk)) @@ -613,7 +635,8 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData + qxrk <- randomQueueData + (q, x, rk) = qxrk _ <- declareExchange(x, ExchangeType.Topic) _ <- declareQueue(DeclarationQueueConfig.default(q)) _ <- bindQueue(q, x, rk) @@ -639,7 +662,8 @@ trait Fs2RabbitSpec { self: BaseSpec => Stream.resource(createConnectionChannel).flatMap { implicit channel => for { - (q, x, rk) <- Stream.eval(randomQueueData) + qxrk <- Stream.eval(randomQueueData) + (q, x, rk) = qxrk _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) _ <- Stream.eval(bindQueue(q, x, rk))