diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala index 2d3f8b2d..eab50152 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala @@ -16,9 +16,9 @@ package dev.profunktor.fs2rabbit.algebra +import com.rabbitmq.client.Channel import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.model._ -import com.rabbitmq.client.Channel trait Publishing[F[_]] { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala index 235a99a0..03cae491 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala @@ -18,7 +18,7 @@ package dev.profunktor.fs2rabbit.interpreter import cats.{Applicative, Functor} import cats.effect.syntax.effect._ -import cats.effect.{Effect, Sync} +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import com.rabbitmq.client._ @@ -31,7 +31,7 @@ import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model._ import scala.util.{Failure, Success, Try} -class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { +class AMQPClientEffect[F[_]: ContextShift: Effect](blocker: Blocker) extends AMQPClient[F] { private[fs2rabbit] def defaultConsumer[A]( channel: Channel, @@ -152,7 +152,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { exchangeName: ExchangeName, routingKey: RoutingKey, msg: AmqpMessage[Array[Byte]] - ): F[Unit] = Sync[F].delay { + ): F[Unit] = blocker.delay { channel.basicPublish( exchangeName.value, routingKey.value, @@ -167,7 +167,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { routingKey: RoutingKey, flag: PublishingFlag, msg: AmqpMessage[Array[Byte]] - ): F[Unit] = Sync[F].delay { + ): F[Unit] = blocker.delay { channel.basicPublish( exchangeName.value, routingKey.value, diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala index c13291bd..7172d5e2 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.interpreter -import cats.effect.{Concurrent, ConcurrentEffect, Resource} +import cats.effect._ import cats.syntax.functor._ import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig} import dev.profunktor.fs2rabbit.algebra._ @@ -30,8 +30,9 @@ import fs2.Stream import javax.net.ssl.SSLContext object Fs2Rabbit { - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: ConcurrentEffect: ContextShift]( config: Fs2RabbitConfig, + blocker: Blocker, sslContext: Option[SSLContext] = None, // Unlike SSLContext, SaslConfig is not optional because it is always set // by the underlying Java library, even if the user doesn't set it. @@ -39,7 +40,7 @@ object Fs2Rabbit { ): F[Fs2Rabbit[F]] = ConnectionEffect.mkConnectionFactory[F](config, sslContext, saslConfig).map { case (factory, addresses) => - val amqpClient = new AmqpClientEffect[F] + val amqpClient = new AMQPClientEffect[F](blocker) val conn = new ConnectionEffect[F](factory, addresses) val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) val acker = new AckingProgram[F](config, amqpClient) @@ -72,8 +73,10 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( queueName: QueueName, basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1), consumerArgs: Option[ConsumerArgs] = None - )(implicit channel: AMQPChannel, - decoder: EnvelopeDecoder[F, A]): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] = + )( + implicit channel: AMQPChannel, + decoder: EnvelopeDecoder[F, A] + ): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] = consumingProgram.createAckerConsumer(channel.value, queueName, basicQos, consumerArgs) def createAutoAckConsumer[A]( @@ -85,7 +88,8 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey)( implicit channel: AMQPChannel, - encoder: MessageEncoder[F, A]): F[A => F[Unit]] = + encoder: MessageEncoder[F, A] + ): F[A => F[Unit]] = publishingProgram.createPublisher(channel.value, exchangeName, routingKey) def createPublisherWithListener[A]( @@ -111,9 +115,9 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( ): F[(ExchangeName, RoutingKey, A) => F[Unit]] = publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener) - def createRoutingPublisher[A](exchangeName: ExchangeName)( - implicit channel: AMQPChannel, - encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = + def createRoutingPublisher[A]( + exchangeName: ExchangeName + )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = publishingProgram.createRoutingPublisher(channel.value, exchangeName) def createRoutingPublisherWithListener[A]( @@ -133,31 +137,38 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( amqpClient.basicCancel(channel.value, consumerTag) def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey) def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey, args) def bindQueueNoWait(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueueNoWait(channel.value, queueName, exchangeName, routingKey, args) def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = unbindQueue(queueName, exchangeName, routingKey, QueueUnbindArgs(Map.empty)) def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueUnbindArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.unbindQueue(channel.value, queueName, exchangeName, routingKey, args) def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindExchange(channel.value, destination, source, routingKey, args) def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = bindExchange(destination, source, routingKey, ExchangeBindingArgs(Map.empty)) def bindExchangeNoWait( @@ -169,11 +180,13 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( amqpClient.bindExchangeNoWait(channel.value, destination, source, routingKey, args) def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeUnbindArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.unbindExchange(channel.value, destination, source, routingKey, args) def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = unbindExchange(destination, source, routingKey, ExchangeUnbindArgs(Map.empty)) def declareExchange(exchangeName: ExchangeName, exchangeType: ExchangeType)(implicit channel: AMQPChannel): F[Unit] = diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala index ad843309..a8b7b754 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala @@ -53,7 +53,9 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F] flag: PublishingFlag, listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - createBasicPublisherWithListener(channel, flag, listener).map(pub => key => msg => pub(exchangeName, key, msg)) + createBasicPublisherWithListener(channel, flag, listener).map( + pub => key => msg => pub(exchangeName, key, msg) + ) override def createBasicPublisher[A]( channel: Channel diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala index bdae42ca..68eaacf0 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala @@ -19,7 +19,7 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Timer} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -27,7 +27,7 @@ import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ -import fs2.{Pipe, Pure, Stream} +import fs2._ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { private val queueName = QueueName("testQ") @@ -61,6 +61,7 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { _ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain } yield () } + } class Flow[F[_]: Concurrent, A]( diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala index 5266fa7a..3b7f26d3 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala @@ -19,7 +19,7 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.Concurrent +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -41,18 +41,17 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag)) } - val program: F[Unit] = { - R.createConnectionChannel use { implicit channel => - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) - consumer <- R.createAutoAckConsumer[String](queueName) - _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain - } yield () - } + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => + for { + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + consumer <- R.createAutoAckConsumer[String](queueName) + _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain + } yield () } + } class AutoAckFlow[F[_]: Concurrent, A]( diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala index 6cc08a85..ed92473f 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala @@ -17,11 +17,12 @@ package dev.profunktor.fs2rabbit.examples import cats.data.NonEmptyList -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object IOAckerConsumer extends IOApp { @@ -42,11 +43,18 @@ object IOAckerConsumer extends IOApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - ResilientStream - .runF(new AckerConsumerDemo[IO](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + ResilientStream + .runF(new AckerConsumerDemo[IO](client).program) + .as(ExitCode.Success) + } } } diff --git a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala similarity index 76% rename from examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala rename to examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala index 56735c1b..8dbd0739 100644 --- a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala @@ -17,12 +17,13 @@ package dev.profunktor.fs2rabbit.examples import cats.data.NonEmptyList -import cats.effect.ExitCode +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream import monix.eval.{Task, TaskApp} +import java.util.concurrent.Executors object MonixAutoAckConsumer extends TaskApp { @@ -43,11 +44,18 @@ object MonixAutoAckConsumer extends TaskApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): Task[ExitCode] = - Fs2Rabbit[Task](config).flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + .as(ExitCode.Success) + } } } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala index cf67370b..c3b5b513 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala @@ -28,6 +28,7 @@ import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2.Stream +import java.util.concurrent.Executors object RPCDemo extends IOApp { @@ -48,13 +49,20 @@ object RPCDemo extends IOApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { implicit fs2Rabbit => - val queue = QueueName("rpc_queue") - runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain.as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { implicit client => + val queue = QueueName("rpc_queue") + runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain.as(ExitCode.Success) + } } - def runServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = + def runServer[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = Stream.resource(R.createConnectionChannel).flatMap { implicit channel => new RPCServer[F](rpcQueue).serve } @@ -83,20 +91,25 @@ class RPCClient[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], chann val correlationId = UUID.randomUUID().toString for { - queue <- Stream.eval(R.declareQueue) - publisher <- Stream.eval(R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value))) - _ <- Stream.eval(putStrLn(s"[Client] Message $body. ReplyTo queue $queue. Correlation $correlationId")) - message = AmqpMessage(body, AmqpProperties(replyTo = Some(queue.value), correlationId = Some(correlationId))) - _ <- Stream.eval(publisher(message)) - consumer <- Stream.eval(R.createAutoAckConsumer(queue)) - response <- consumer.filter(_.properties.correlationId.contains(correlationId)).take(1) - _ <- Stream.eval(putStrLn(s"[Client] Request $body. Received response [${response.payload}]")) + queue <- Stream.eval(R.declareQueue) + publisher <- Stream.eval( + R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value)) + ) + _ <- Stream.eval(putStrLn(s"[Client] Message $body. ReplyTo queue $queue. Correlation $correlationId")) + message = AmqpMessage(body, AmqpProperties(replyTo = Some(queue.value), correlationId = Some(correlationId))) + _ <- Stream.eval(publisher(message)) + consumer <- Stream.eval(R.createAutoAckConsumer(queue)) + response <- consumer.filter(_.properties.correlationId.contains(correlationId)).take(1) + _ <- Stream.eval(putStrLn(s"[Client] Request $body. Received response [${response.payload}]")) } yield response } } -class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { +class RPCServer[F[_]: Sync](rpcQueue: QueueName)( + implicit R: Fs2Rabbit[F], + channel: AMQPChannel +) { private val EmptyExchange = ExchangeName("") @@ -116,7 +129,7 @@ class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F val replyTo = e.properties.replyTo.toRight(new IllegalArgumentException("ReplyTo parameter is missing")) for { - rk <- IO.fromEither(replyTo).to[F] + rk <- replyTo.liftTo[F] _ <- putStrLn(s"[Server] Received message [${e.payload}]. ReplyTo $rk. CorrelationId $correlationId") publisher <- R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rk)) response = AmqpMessage(s"Response for ${e.payload}", AmqpProperties(correlationId = correlationId)) diff --git a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala similarity index 74% rename from examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala rename to examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala index a1d45028..9a04b897 100644 --- a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala @@ -19,10 +19,12 @@ package dev.profunktor.fs2rabbit.examples import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import cats.effect.{Blocker, Resource} import zio._ import zio.interop.catz._ import zio.interop.catz.implicits._ import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object ZIOAutoAckConsumer extends CatsApp { @@ -38,11 +40,18 @@ object ZIOAutoAckConsumer extends CatsApp { internalQueueSize = Some(500) ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): UIO[Int] = - Fs2Rabbit[Task](config) - .flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) + blockerResource + .use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + } } .run .map(_ => 0) diff --git a/site/src/main/tut/client.md b/site/src/main/tut/client.md index 838a6e3e..2a6f9edd 100644 --- a/site/src/main/tut/client.md +++ b/site/src/main/tut/client.md @@ -6,18 +6,21 @@ number: 2 # Fs2 Rabbit Client -`Fs2Rabbit` is the main client that wraps the communication with `RabbitMQ`. All it needs are a `Fs2RabbitConfig`, an optional `SSLContext` and an instance of `ConcurrentEffect[F]`. +`Fs2Rabbit` is the main client that wraps the communication with `RabbitMQ`. The mandatory arguments are a `Fs2RabbitConfig` and a `cats.effect.Blocker` used for publishing (this action is blocking in the underlying Java client). Optionally, you can pass in a custom `SSLContext` and `SaslConfig`. ```tut:book:silent import cats.effect._ +import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig} import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import javax.net.ssl.SSLContext object Fs2Rabbit { - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: ConcurrentEffect: ContextShift]( config: Fs2RabbitConfig, - sslContext: Option[SSLContext] = None + blocker: Blocker, + sslContext: Option[SSLContext] = None, + saslConfig: SaslConfig = DefaultSaslConfig.PLAIN ): F[Fs2Rabbit[F]] = ??? } ``` @@ -25,10 +28,11 @@ object Fs2Rabbit { Its creation is effectful so you need to `flatMap` and pass it as an argument. For example: ```tut:book:silent -import cats.effect.{ExitCode, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import java.util.concurrent.Executors object Program { def foo[F[_]](client: Fs2Rabbit[F]): F[Unit] = ??? @@ -48,11 +52,17 @@ class Demo extends IOApp { internalQueueSize = Some(500) ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - Program.foo[IO](client).as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + Program.foo[IO](client).as(ExitCode.Success) + } } } ``` - diff --git a/site/src/main/tut/examples/index.md b/site/src/main/tut/examples/index.md index 91962212..5f370a4b 100644 --- a/site/src/main/tut/examples/index.md +++ b/site/src/main/tut/examples/index.md @@ -7,7 +7,7 @@ position: 2 # Examples -The source code for some of the examples can be found [here](https://github.com/gvolpe/fs2-rabbit/tree/master/examples/src/main/scala/dev.profunktor/fs2rabbit/examples). +The source code for some of the examples can be found [here](https://github.com/profunktor/fs2-rabbit/tree/master/examples/src/main/scala/dev.profunktor/fs2rabbit/examples). ### Simple - **[Single AutoAckConsumer](./sample-autoack.html)**: Example of a single `AutoAckConsumer`, a `Publisher` and `Json` data manipulation. diff --git a/site/src/main/tut/examples/sample-acker.md b/site/src/main/tut/examples/sample-acker.md index 4fddb338..095d2fee 100644 --- a/site/src/main/tut/examples/sample-acker.md +++ b/site/src/main/tut/examples/sample-acker.md @@ -12,7 +12,7 @@ Here we create a single `AckerConsumer`, a single `Publisher` and finally we pub import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Timer, Sync} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -69,7 +69,7 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { // Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => F[Unit] = pr => Sync[F].delay(s"Publish listener: $pr") - val program: F[Unit] = R.createConnectionChannel use { implicit channel => + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => for { _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- R.declareExchange(exchangeName, ExchangeType.Topic) @@ -91,11 +91,12 @@ At the edge of out program we define our effect, `cats.effect.IO` in this case, ```tut:book:silent import cats.data.NonEmptyList -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object IOAckerConsumer extends IOApp { @@ -116,11 +117,18 @@ object IOAckerConsumer extends IOApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - ResilientStream - .runF(new AckerConsumerDemo[IO](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + ResilientStream + .runF(new AckerConsumerDemo[IO](client).program) + .as(ExitCode.Success) + } } } diff --git a/site/src/main/tut/examples/sample-autoack.md b/site/src/main/tut/examples/sample-autoack.md index 9f26b3bc..e4906bd9 100644 --- a/site/src/main/tut/examples/sample-autoack.md +++ b/site/src/main/tut/examples/sample-autoack.md @@ -12,7 +12,7 @@ Here we create a single `AutoAckConsumer`, a single `Publisher` and finally we p import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Sync} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -20,7 +20,7 @@ import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ -import fs2.{Pipe, Pure, Stream} +import fs2._ class AutoAckFlow[F[_]: Concurrent, A]( consumer: Stream[F, AmqpEnvelope[A]], @@ -63,17 +63,15 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { Sync[F].delay(println(s"Consumed: $amqpMsg")).as(Ack(amqpMsg.deliveryTag)) } - val program: F[Unit] = { - R.createConnectionChannel use { implicit channel => - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) - consumer <- R.createAutoAckConsumer[String](queueName) - _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain - } yield () - } + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => + for { + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + consumer <- R.createAutoAckConsumer[String](queueName) + _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain + } yield () } } ``` @@ -82,12 +80,13 @@ At the edge of out program we define our effect, `monix.eval.Task` in this case, ```tut:book:silent import cats.data.NonEmptyList -import cats.effect.ExitCode +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream import monix.eval.{Task, TaskApp} +import java.util.concurrent.Executors object MonixAutoAckConsumer extends TaskApp { @@ -108,11 +107,18 @@ object MonixAutoAckConsumer extends TaskApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): Task[ExitCode] = - Fs2Rabbit[Task](config).flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + .as(ExitCode.Success) + } } } diff --git a/site/src/main/tut/examples/sample-mult-connections.md b/site/src/main/tut/examples/sample-mult-connections.md index d04ff726..de98ac30 100644 --- a/site/src/main/tut/examples/sample-mult-connections.md +++ b/site/src/main/tut/examples/sample-mult-connections.md @@ -17,7 +17,7 @@ We start by defining three different programs representing each connection, name We will be consuming messages from `c1` and `c2`, and publishing the result to `p3` concurrently. Thanks to `fs2` this becomes such a simple case: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit diff --git a/site/src/main/tut/examples/sample-mult-consumers.md b/site/src/main/tut/examples/sample-mult-consumers.md index 3dcb5814..b8dfc7c1 100644 --- a/site/src/main/tut/examples/sample-mult-consumers.md +++ b/site/src/main/tut/examples/sample-mult-consumers.md @@ -9,7 +9,7 @@ number: 16 Given two `Consumers` bound to queues with different `RoutingKey`s `RKA` and `RKB` and a single `Publisher` bound to a single `RoutingKey` named `RKA` we will be publishing messages to both queues but expecting to only consume messages published to the `RKA`. The second consumer bound to `RKB` will not receive any messages: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ diff --git a/site/src/main/tut/publishers/index.md b/site/src/main/tut/publishers/index.md index 67748dd6..fb59da6a 100644 --- a/site/src/main/tut/publishers/index.md +++ b/site/src/main/tut/publishers/index.md @@ -6,6 +6,8 @@ number: 10 # Publishers +Publishing are blocking actions in the underlying Java client so you need to pass in a [`cats.effect.Blocker`](https://typelevel.org/cats-effect/concurrency/basics.html#choosing-thread-pool) when creating the `Fs2Rabbit` client. + - **[Publisher](./publisher.html)**: A simple message publisher. - **[Publisher with Listener](./publisher-with-listener.html)**: A publisher with a listener for messages that can not be routed. - **[Publishing Json](./json.html)**: Publishing Json messages using the `fs2-rabbit-circe` module. diff --git a/site/src/main/tut/publishers/publisher-with-listener.md b/site/src/main/tut/publishers/publisher-with-listener.md index 4e0590fa..86cd6b33 100644 --- a/site/src/main/tut/publishers/publisher-with-listener.md +++ b/site/src/main/tut/publishers/publisher-with-listener.md @@ -21,7 +21,7 @@ The server SHOULD implement the mandatory flag. It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag` and a listener, i.e. a function from `PublishReturn` to `F[Unit]`: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -30,7 +30,6 @@ val routingKey = RoutingKey("testRK") val publishingFlag: PublishingFlag = PublishingFlag(mandatory = true) -// Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => IO[Unit] = pr => IO(println(s"Publish listener: $pr")) def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit diff --git a/site/src/main/tut/publishers/publisher.md b/site/src/main/tut/publishers/publisher.md index 24e7fb4f..54515248 100644 --- a/site/src/main/tut/publishers/publisher.md +++ b/site/src/main/tut/publishers/publisher.md @@ -9,7 +9,7 @@ number: 11 A `Publisher` is simply created by specifying an `ExchangeName` and a `RoutingKey`: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index 6c7adc87..a01d2247 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.interpreter import cats.effect.concurrent.Deferred -import cats.effect.IO +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.config.declaration._ @@ -30,6 +30,7 @@ import org.scalatest.Assertion import scala.concurrent.duration._ import scala.util.Random import scala.concurrent.Future +import java.util.concurrent.Executors trait Fs2RabbitSpec { self: BaseSpec => @@ -37,6 +38,11 @@ trait Fs2RabbitSpec { self: BaseSpec => val emptyAssertion: Assertion = true shouldBe true + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + it should "create a connection and a queue with default arguments" in withRabbit { interpreter => import interpreter._ @@ -288,15 +294,19 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, x, rk) <- randomQueueData - ct <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk) - consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- publisher("test") + (q, x, rk) <- randomQueueData + ct <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk) + consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") _ <- consumer .take(1) .evalTap { msg => @@ -396,19 +406,23 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => for { - (q, _, rk) <- randomQueueData - sourceExchange <- mkRandomString.map(ExchangeName) - destExchange <- mkRandomString.map(ExchangeName) - consumerTag <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(sourceExchange, ExchangeType.Direct) - _ <- declareExchange(destExchange, ExchangeType.Direct) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, destExchange, rk) - _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) - publisher <- createPublisher[String](sourceExchange, rk) - consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- publisher("test") + (q, _, rk) <- randomQueueData + sourceExchange <- mkRandomString.map(ExchangeName) + destExchange <- mkRandomString.map(ExchangeName) + consumerTag <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(sourceExchange, ExchangeType.Direct) + _ <- declareExchange(destExchange, ExchangeType.Direct) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, destExchange, rk) + _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) + publisher <- createPublisher[String](sourceExchange, rk) + consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") _ <- consumer .take(1) .evalTap { msg => @@ -511,11 +525,12 @@ trait Fs2RabbitSpec { self: BaseSpec => _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) _ <- Stream.eval(bindQueue(q, x, rk)) promise <- Stream.eval(Deferred[IO, PublishReturn]) - publisher <- Stream.eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise))) - _ <- Stream.eval(publisher("test")) - callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone - consumer <- Stream.eval(createAutoAckConsumer(q)) - result <- takeWithTimeOut(consumer, 500.millis) + publisher <- Stream + .eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise))) + _ <- Stream.eval(publisher("test")) + callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone + consumer <- Stream.eval(createAutoAckConsumer(q)) + result <- takeWithTimeOut(consumer, 500.millis) } yield { result shouldBe None callback.body.value shouldEqual "test".getBytes("UTF-8") @@ -614,13 +629,25 @@ trait Fs2RabbitSpec { self: BaseSpec => } private def withStreamRabbit[A](fa: Fs2Rabbit[IO] => Stream[IO, A]): Future[Assertion] = - Fs2Rabbit[IO](config).flatMap(r => fa(r).compile.drain).as(emptyAssertion).unsafeToFuture + blockerResource + .use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap(r => fa(r).compile.drain) + } + .as(emptyAssertion) + .unsafeToFuture private def withStreamNackRabbit[A](fa: Fs2Rabbit[IO] => Stream[IO, A]): Future[Assertion] = - Fs2Rabbit[IO](config.copy(requeueOnNack = true)).flatMap(r => fa(r).compile.drain).as(emptyAssertion).unsafeToFuture + blockerResource + .use { blocker => + Fs2Rabbit[IO](config.copy(requeueOnNack = true), blocker).flatMap(r => fa(r).compile.drain) + } + .as(emptyAssertion) + .unsafeToFuture private def withRabbit[A](fa: Fs2Rabbit[IO] => IO[A]): Future[A] = - Fs2Rabbit[IO](config).flatMap(r => fa(r)).unsafeToFuture + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap(r => fa(r)) + }.unsafeToFuture private def randomQueueData: IO[(QueueName, ExchangeName, RoutingKey)] = (mkRandomString, mkRandomString, mkRandomString).mapN {