From 5ddf8307e25a3eb2061dd1911153a35c1b2d8489 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Tue, 17 Sep 2019 12:58:16 -0500 Subject: [PATCH 1/4] Creating a publisher requires a cats.effect.Blocker from now on --- .../fs2rabbit/algebra/Publishing.scala | 21 +- .../fs2rabbit/interpreter/Fs2Rabbit.scala | 70 ++- .../fs2rabbit/program/PublishingProgram.scala | 29 +- .../examples/AckerConsumerDemo.scala | 42 +- .../examples/AutoAckConsumerDemo.scala | 19 +- .../examples/MonixAutoAckConsumer.scala | 0 .../fs2rabbit/examples/RPCDemo.scala | 44 +- .../examples/ZIOAutoAckConsumer.scala | 0 site/src/main/tut/examples/sample-acker.md | 37 +- site/src/main/tut/examples/sample-autoack.md | 18 +- .../tut/examples/sample-mult-connections.md | 18 +- .../tut/examples/sample-mult-consumers.md | 36 +- site/src/main/tut/publishers/index.md | 2 + .../tut/publishers/publisher-with-listener.md | 21 +- site/src/main/tut/publishers/publisher.md | 20 +- .../fs2rabbit/interpreter/Fs2RabbitSpec.scala | 485 ++++++++++-------- 16 files changed, 513 insertions(+), 349 deletions(-) rename examples/src/main/{scala-2.12 => scala}/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala (100%) rename examples/src/main/{scala-2.12 => scala}/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala (100%) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala index 2d3f8b2d..e5e35ef4 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala @@ -16,16 +16,18 @@ package dev.profunktor.fs2rabbit.algebra +import cats.effect.Blocker +import com.rabbitmq.client.Channel import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.model._ -import com.rabbitmq.client.Channel trait Publishing[F[_]] { def createPublisher[A]( channel: Channel, exchangeName: ExchangeName, - routingKey: RoutingKey + routingKey: RoutingKey, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] def createPublisherWithListener[A]( @@ -33,29 +35,34 @@ trait Publishing[F[_]] { exchangeName: ExchangeName, routingKey: RoutingKey, flags: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] def createRoutingPublisher[A]( channel: Channel, - exchangeName: ExchangeName + exchangeName: ExchangeName, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] def createRoutingPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, flags: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] def createBasicPublisher[A]( - channel: Channel + channel: Channel, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] def createBasicPublisherWithListener[A]( channel: Channel, flags: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala index c13291bd..ba49e8a2 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.interpreter -import cats.effect.{Concurrent, ConcurrentEffect, Resource} +import cats.effect._ import cats.syntax.functor._ import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig} import dev.profunktor.fs2rabbit.algebra._ @@ -72,8 +72,10 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( queueName: QueueName, basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1), consumerArgs: Option[ConsumerArgs] = None - )(implicit channel: AMQPChannel, - decoder: EnvelopeDecoder[F, A]): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] = + )( + implicit channel: AMQPChannel, + decoder: EnvelopeDecoder[F, A] + ): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] = consumingProgram.createAckerConsumer(channel.value, queueName, basicQos, consumerArgs) def createAutoAckConsumer[A]( @@ -83,45 +85,50 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( )(implicit channel: AMQPChannel, decoder: EnvelopeDecoder[F, A]): F[Stream[F, AmqpEnvelope[A]]] = consumingProgram.createAutoAckConsumer(channel.value, queueName, basicQos, consumerArgs) - def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey)( + def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey, blocker: Blocker)( implicit channel: AMQPChannel, - encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - publishingProgram.createPublisher(channel.value, exchangeName, routingKey) + encoder: MessageEncoder[F, A] + ): F[A => F[Unit]] = + publishingProgram.createPublisher(channel.value, exchangeName, routingKey, blocker) def createPublisherWithListener[A]( exchangeName: ExchangeName, routingKey: RoutingKey, flags: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - publishingProgram.createPublisherWithListener(channel.value, exchangeName, routingKey, flags, listener) + publishingProgram.createPublisherWithListener(channel.value, exchangeName, routingKey, flags, listener, blocker) - def createBasicPublisher[A]( + def createBasicPublisher[A](blocker: Blocker)( implicit channel: AMQPChannel, encoder: MessageEncoder[F, A] ): F[(ExchangeName, RoutingKey, A) => F[Unit]] = - publishingProgram.createBasicPublisher(channel.value) + publishingProgram.createBasicPublisher(channel.value, blocker) def createBasicPublisherWithListener[A]( flag: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )( implicit channel: AMQPChannel, encoder: MessageEncoder[F, A] ): F[(ExchangeName, RoutingKey, A) => F[Unit]] = - publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener) + publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener, blocker) - def createRoutingPublisher[A](exchangeName: ExchangeName)( - implicit channel: AMQPChannel, - encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - publishingProgram.createRoutingPublisher(channel.value, exchangeName) + def createRoutingPublisher[A]( + exchangeName: ExchangeName, + blocker: Blocker + )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = + publishingProgram.createRoutingPublisher(channel.value, exchangeName, blocker) def createRoutingPublisherWithListener[A]( exchangeName: ExchangeName, flags: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - publishingProgram.createRoutingPublisherWithListener(channel.value, exchangeName, flags, listener) + publishingProgram.createRoutingPublisherWithListener(channel.value, exchangeName, flags, listener, blocker) def addPublishingListener(listener: PublishReturn => F[Unit])(implicit channel: AMQPChannel): F[Unit] = amqpClient.addPublishingListener(channel.value, listener) @@ -133,31 +140,38 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( amqpClient.basicCancel(channel.value, consumerTag) def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey) def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey, args) def bindQueueNoWait(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindQueueNoWait(channel.value, queueName, exchangeName, routingKey, args) def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = unbindQueue(queueName, exchangeName, routingKey, QueueUnbindArgs(Map.empty)) def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueUnbindArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.unbindQueue(channel.value, queueName, exchangeName, routingKey, args) def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeBindingArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.bindExchange(channel.value, destination, source, routingKey, args) def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = bindExchange(destination, source, routingKey, ExchangeBindingArgs(Map.empty)) def bindExchangeNoWait( @@ -169,11 +183,13 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( amqpClient.bindExchangeNoWait(channel.value, destination, source, routingKey, args) def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeUnbindArgs)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = amqpClient.unbindExchange(channel.value, destination, source, routingKey, args) def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)( - implicit channel: AMQPChannel): F[Unit] = + implicit channel: AMQPChannel + ): F[Unit] = unbindExchange(destination, source, routingKey, ExchangeUnbindArgs(Map.empty)) def declareExchange(exchangeName: ExchangeName, exchangeType: ExchangeType)(implicit channel: AMQPChannel): F[Unit] = diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala index ad843309..233d49cc 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala @@ -17,6 +17,7 @@ package dev.profunktor.fs2rabbit.program import cats.{Applicative, Monad} +import cats.effect.Blocker import cats.implicits._ import dev.profunktor.fs2rabbit.algebra.{AMQPClient, Publishing} import dev.profunktor.fs2rabbit.effects.MessageEncoder @@ -28,35 +29,42 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F] override def createPublisher[A]( channel: Channel, exchangeName: ExchangeName, - routingKey: RoutingKey + routingKey: RoutingKey, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - createRoutingPublisher(channel, exchangeName).map(_.apply(routingKey)) + createRoutingPublisher(channel, exchangeName, blocker).map(_.apply(routingKey)) override def createPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, routingKey: RoutingKey, flag: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - createRoutingPublisherWithListener(channel, exchangeName, flag, listener).map(_.apply(routingKey)) + createRoutingPublisherWithListener(channel, exchangeName, flag, listener, blocker).map(_.apply(routingKey)) override def createRoutingPublisher[A]( channel: Channel, - exchangeName: ExchangeName + exchangeName: ExchangeName, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - createBasicPublisher(channel).map(pub => key => msg => pub(exchangeName, key, msg)) + createBasicPublisher(channel, blocker).map(pub => key => msg => pub(exchangeName, key, msg)) override def createRoutingPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, flag: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - createBasicPublisherWithListener(channel, flag, listener).map(pub => key => msg => pub(exchangeName, key, msg)) + createBasicPublisherWithListener(channel, flag, listener, blocker).map( + pub => key => msg => pub(exchangeName, key, msg) + ) override def createBasicPublisher[A]( - channel: Channel + channel: Channel, + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] = Applicative[F].pure { case (exchangeName: ExchangeName, routingKey: RoutingKey, msg: A @unchecked) => @@ -66,7 +74,8 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F] override def createBasicPublisherWithListener[A]( channel: Channel, flag: PublishingFlag, - listener: PublishReturn => F[Unit] + listener: PublishReturn => F[Unit], + blocker: Blocker )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] = AMQP.addPublishingListener(channel, listener).as { case (exchangeName: ExchangeName, routingKey: RoutingKey, msg: A @unchecked) => diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala index bdae42ca..93e3fabf 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala @@ -19,7 +19,7 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Timer} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -27,7 +27,8 @@ import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ -import fs2.{Pipe, Pure, Stream} +import fs2._ +import java.util.concurrent.Executors class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { private val queueName = QueueName("testQ") @@ -46,21 +47,32 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { // Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => F[Unit] = pr => putStrLn(s"Publish listener: $pr") - val program: F[Unit] = R.createConnectionChannel.use { implicit channel => + val resources: Resource[F, (AMQPChannel, Blocker)] = for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - (acker, consumer) <- R.createAckerConsumer[String](queueName) - publisher <- R.createPublisherWithListener[AmqpMessage[String]]( - exchangeName, - routingKey, - publishingFlag, - publishingListener - ) - _ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain - } yield () + channel <- R.createConnectionChannel + blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + + val program: F[Unit] = resources.use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + (acker, consumer) <- R.createAckerConsumer[String](queueName) + publisher <- R.createPublisherWithListener[AmqpMessage[String]]( + exchangeName, + routingKey, + publishingFlag, + publishingListener, + blocker + ) + _ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain + } yield () } + } class Flow[F[_]: Concurrent, A]( diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala index 5266fa7a..c71918d5 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala @@ -19,7 +19,7 @@ package dev.profunktor.fs2rabbit.examples import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.Concurrent +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -29,6 +29,7 @@ import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} import io.circe.Encoder +import java.util.concurrent.Executors class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { private val queueName = QueueName("testQ") @@ -41,18 +42,26 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag)) } - val program: F[Unit] = { - R.createConnectionChannel use { implicit channel => + val resources: Resource[F, (AMQPChannel, Blocker)] = + for { + channel <- R.createConnectionChannel + blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + + val program: F[Unit] = resources.use { + case (channel, blocker) => + implicit val rabbitChannel = channel for { _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- R.declareExchange(exchangeName, ExchangeType.Topic) _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey, blocker) consumer <- R.createAutoAckConsumer[String](queueName) _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain } yield () - } } + } class AutoAckFlow[F[_]: Concurrent, A]( diff --git a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala similarity index 100% rename from examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala rename to examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala index cf67370b..789fbccc 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala @@ -28,6 +28,7 @@ import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2.Stream +import java.util.concurrent.Executors object RPCDemo extends IOApp { @@ -48,20 +49,24 @@ object RPCDemo extends IOApp { automaticRecovery = true ) + val publishingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { implicit fs2Rabbit => + Fs2Rabbit[IO](config).flatMap { implicit client => val queue = QueueName("rpc_queue") - runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain.as(ExitCode.Success) + publishingES.map(Blocker.liftExecutorService).use { blocker => + runServer[IO](queue, blocker).concurrently(runClient[IO](queue, blocker)).compile.drain.as(ExitCode.Success) + } } - def runServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = + def runServer[F[_]: Sync: LiftIO](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = Stream.resource(R.createConnectionChannel).flatMap { implicit channel => - new RPCServer[F](rpcQueue).serve + new RPCServer[F](rpcQueue, blocker).serve } - def runClient[F[_]: Concurrent](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = + def runClient[F[_]: Concurrent](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = Stream.resource(R.createConnectionChannel).flatMap { implicit channel => - val client = new RPCClient[F](rpcQueue) + val client = new RPCClient[F](rpcQueue, blocker) Stream( client.call("Message 1"), @@ -72,7 +77,7 @@ object RPCDemo extends IOApp { } -class RPCClient[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { +class RPCClient[F[_]: Sync](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { private val EmptyExchange = ExchangeName("") @@ -83,20 +88,25 @@ class RPCClient[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], chann val correlationId = UUID.randomUUID().toString for { - queue <- Stream.eval(R.declareQueue) - publisher <- Stream.eval(R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value))) - _ <- Stream.eval(putStrLn(s"[Client] Message $body. ReplyTo queue $queue. Correlation $correlationId")) - message = AmqpMessage(body, AmqpProperties(replyTo = Some(queue.value), correlationId = Some(correlationId))) - _ <- Stream.eval(publisher(message)) - consumer <- Stream.eval(R.createAutoAckConsumer(queue)) - response <- consumer.filter(_.properties.correlationId.contains(correlationId)).take(1) - _ <- Stream.eval(putStrLn(s"[Client] Request $body. Received response [${response.payload}]")) + queue <- Stream.eval(R.declareQueue) + publisher <- Stream.eval( + R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value), blocker) + ) + _ <- Stream.eval(putStrLn(s"[Client] Message $body. ReplyTo queue $queue. Correlation $correlationId")) + message = AmqpMessage(body, AmqpProperties(replyTo = Some(queue.value), correlationId = Some(correlationId))) + _ <- Stream.eval(publisher(message)) + consumer <- Stream.eval(R.createAutoAckConsumer(queue)) + response <- consumer.filter(_.properties.correlationId.contains(correlationId)).take(1) + _ <- Stream.eval(putStrLn(s"[Client] Request $body. Received response [${response.payload}]")) } yield response } } -class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { +class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName, blocker: Blocker)( + implicit R: Fs2Rabbit[F], + channel: AMQPChannel +) { private val EmptyExchange = ExchangeName("") @@ -118,7 +128,7 @@ class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F for { rk <- IO.fromEither(replyTo).to[F] _ <- putStrLn(s"[Server] Received message [${e.payload}]. ReplyTo $rk. CorrelationId $correlationId") - publisher <- R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rk)) + publisher <- R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rk), blocker) response = AmqpMessage(s"Response for ${e.payload}", AmqpProperties(correlationId = correlationId)) _ <- publisher(response) } yield () diff --git a/examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala similarity index 100% rename from examples/src/main/scala-2.12/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala rename to examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala diff --git a/site/src/main/tut/examples/sample-acker.md b/site/src/main/tut/examples/sample-acker.md index 4fddb338..e0d29ef9 100644 --- a/site/src/main/tut/examples/sample-acker.md +++ b/site/src/main/tut/examples/sample-acker.md @@ -12,7 +12,7 @@ Here we create a single `AckerConsumer`, a single `Publisher` and finally we pub import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Timer, Sync} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -21,6 +21,7 @@ import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} +import java.util.concurrent.Executors class Flow[F[_]: Concurrent, A]( consumer: Stream[F, AmqpEnvelope[A]], @@ -69,19 +70,29 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { // Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => F[Unit] = pr => Sync[F].delay(s"Publish listener: $pr") - val program: F[Unit] = R.createConnectionChannel use { implicit channel => + val resources: Resource[F, (AMQPChannel, Blocker)] = for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisherWithListener[AmqpMessage[String]](exchangeName, - routingKey, - publishingFlag, - publishingListener) - (acker, consumer) <- R.createAckerConsumer[String](queueName) - result = new Flow[F, String](consumer, acker, logPipe, publisher).flow - _ <- result.compile.drain - } yield () + channel <- R.createConnectionChannel + blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + + val program: F[Unit] = resources.use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisherWithListener[AmqpMessage[String]](exchangeName, + routingKey, + publishingFlag, + publishingListener, + blocker) + (acker, consumer) <- R.createAckerConsumer[String](queueName) + result = new Flow[F, String](consumer, acker, logPipe, publisher).flow + _ <- result.compile.drain + } yield () } } diff --git a/site/src/main/tut/examples/sample-autoack.md b/site/src/main/tut/examples/sample-autoack.md index 9f26b3bc..3149fd94 100644 --- a/site/src/main/tut/examples/sample-autoack.md +++ b/site/src/main/tut/examples/sample-autoack.md @@ -12,7 +12,7 @@ Here we create a single `AutoAckConsumer`, a single `Publisher` and finally we p import java.nio.charset.StandardCharsets.UTF_8 import cats.data.Kleisli -import cats.effect.{Concurrent, Sync} +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit @@ -21,6 +21,7 @@ import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} +import java.util.concurrent.Executors class AutoAckFlow[F[_]: Concurrent, A]( consumer: Stream[F, AmqpEnvelope[A]], @@ -63,18 +64,25 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { Sync[F].delay(println(s"Consumed: $amqpMsg")).as(Ack(amqpMsg.deliveryTag)) } - val program: F[Unit] = { - R.createConnectionChannel use { implicit channel => + val resources: Resource[F, (AMQPChannel, Blocker)] = + for { + channel <- R.createConnectionChannel + blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + + val program: F[Unit] = resources.use { + case (channel, blocker) => + implicit val rabbitChannel = channel for { _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) _ <- R.declareExchange(exchangeName, ExchangeType.Topic) _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey, blocker) consumer <- R.createAutoAckConsumer[String](queueName) _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain } yield () } - } } ``` diff --git a/site/src/main/tut/examples/sample-mult-connections.md b/site/src/main/tut/examples/sample-mult-connections.md index d04ff726..2c436f04 100644 --- a/site/src/main/tut/examples/sample-mult-connections.md +++ b/site/src/main/tut/examples/sample-mult-connections.md @@ -17,12 +17,13 @@ We start by defining three different programs representing each connection, name We will be consuming messages from `c1` and `c2`, and publishing the result to `p3` concurrently. Thanks to `fs2` this becomes such a simple case: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2._ +import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -60,10 +61,19 @@ def p2(R: Fs2Rabbit[IO]) = Here's our program `p3` creating a `Publisher` representing the third `Connection`: ```tut:book:silent +def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = + for { + channel <- client.createConnectionChannel + blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + def p3(R: Fs2Rabbit[IO]) = - R.createConnectionChannel use { implicit channel => - R.declareExchange(ex, ExchangeType.Topic) *> - R.createPublisher(ex, rk) + resources(R).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + R.declareExchange(ex, ExchangeType.Topic) *> + R.createPublisher(ex, rk, blocker) } ``` diff --git a/site/src/main/tut/examples/sample-mult-consumers.md b/site/src/main/tut/examples/sample-mult-consumers.md index 3dcb5814..aabc6674 100644 --- a/site/src/main/tut/examples/sample-mult-consumers.md +++ b/site/src/main/tut/examples/sample-mult-consumers.md @@ -9,11 +9,12 @@ number: 16 Given two `Consumers` bound to queues with different `RoutingKey`s `RKA` and `RKB` and a single `Publisher` bound to a single `RoutingKey` named `RKA` we will be publishing messages to both queues but expecting to only consume messages published to the `RKA`. The second consumer bound to `RKB` will not receive any messages: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2._ +import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -35,19 +36,28 @@ def multipleConsumers(c1: Stream[IO, AmqpEnvelope[String]], c2: Stream[IO, AmqpE ).parJoin(3) } +def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = + for { + channel <- client.createConnectionChannel + blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + def program(R: Fs2Rabbit[IO]) = - R.createConnectionChannel.use { implicit channel => - for { - _ <- R.declareExchange(ex, ExchangeType.Topic) - _ <- R.declareQueue(DeclarationQueueConfig.default(q1)) - _ <- R.declareQueue(DeclarationQueueConfig.default(q2)) - _ <- R.bindQueue(q1, ex, rka) - _ <- R.bindQueue(q2, ex, rkb) - c1 <- R.createAutoAckConsumer[String](q1) - c2 <- R.createAutoAckConsumer[String](q2) - p <- R.createPublisher[String](ex, rka) - _ <- multipleConsumers(c1, c2, p).compile.drain - } yield () + resources(R).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + _ <- R.declareExchange(ex, ExchangeType.Topic) + _ <- R.declareQueue(DeclarationQueueConfig.default(q1)) + _ <- R.declareQueue(DeclarationQueueConfig.default(q2)) + _ <- R.bindQueue(q1, ex, rka) + _ <- R.bindQueue(q2, ex, rkb) + c1 <- R.createAutoAckConsumer[String](q1) + c2 <- R.createAutoAckConsumer[String](q2) + p <- R.createPublisher[String](ex, rka, blocker) + _ <- multipleConsumers(c1, c2, p).compile.drain + } yield () } ``` diff --git a/site/src/main/tut/publishers/index.md b/site/src/main/tut/publishers/index.md index 67748dd6..82d9f3d6 100644 --- a/site/src/main/tut/publishers/index.md +++ b/site/src/main/tut/publishers/index.md @@ -6,6 +6,8 @@ number: 10 # Publishers +Publishing are blocking actions in the underlying Java client so you need to pass in a `cats.effect.Blocker` when creating a publisher. The examples demonstrate how to create one. + - **[Publisher](./publisher.html)**: A simple message publisher. - **[Publisher with Listener](./publisher-with-listener.html)**: A publisher with a listener for messages that can not be routed. - **[Publishing Json](./json.html)**: Publishing Json messages using the `fs2-rabbit-circe` module. diff --git a/site/src/main/tut/publishers/publisher-with-listener.md b/site/src/main/tut/publishers/publisher-with-listener.md index 4e0590fa..8ded8800 100644 --- a/site/src/main/tut/publishers/publisher-with-listener.md +++ b/site/src/main/tut/publishers/publisher-with-listener.md @@ -18,26 +18,35 @@ The server SHOULD implement the mandatory flag. ### Creating a Publisher with Listener -It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag` and a listener, i.e. a function from `PublishReturn` to `F[Unit]`: +It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag` and a listener, i.e. a function from `PublishReturn` to `F[Unit]`. In addition, you need to pass in a `cats.effect.Blocker` since publishing are blocking actions in the underlying Java client: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import java.util.concurrent.Executors val exchangeName = ExchangeName("testEX") val routingKey = RoutingKey("testRK") val publishingFlag: PublishingFlag = PublishingFlag(mandatory = true) -// Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => IO[Unit] = pr => IO(println(s"Publish listener: $pr")) def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit -def program(R: Fs2Rabbit[IO]) = - R.createConnectionChannel.use { implicit channel => - R.createPublisherWithListener[String](exchangeName, routingKey, publishingFlag, publishingListener).flatMap(doSomething) +def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = + for { + channel <- client.createConnectionChannel + blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + +def program(client: Fs2Rabbit[IO]) = + resources(client).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + client.createPublisherWithListener[String](exchangeName, routingKey, publishingFlag, publishingListener, blocker).flatMap(doSomething) } ``` diff --git a/site/src/main/tut/publishers/publisher.md b/site/src/main/tut/publishers/publisher.md index 24e7fb4f..c0bf1a89 100644 --- a/site/src/main/tut/publishers/publisher.md +++ b/site/src/main/tut/publishers/publisher.md @@ -6,21 +6,31 @@ number: 11 # Publisher -A `Publisher` is simply created by specifying an `ExchangeName` and a `RoutingKey`: +A `Publisher` is simply created by specifying an `ExchangeName` and a `RoutingKey`. In addition, you need to pass in a `cats.effect.Blocker` since publishing are blocking actions in the underlying Java client: ```tut:book:silent -import cats.effect.IO +import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import java.util.concurrent.Executors val exchangeName = ExchangeName("testEX") val routingKey = RoutingKey("testRK") def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit -def program(R: Fs2Rabbit[IO]) = - R.createConnectionChannel.use { implicit channel => - R.createPublisher[String](exchangeName, routingKey).flatMap(doSomething) +def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = + for { + channel <- client.createConnectionChannel + blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + +def program(client: Fs2Rabbit[IO]) = + resources(client).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + client.createPublisher[String](exchangeName, routingKey, blocker).flatMap(doSomething) } ``` diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index 6c7adc87..668b83b1 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.interpreter import cats.effect.concurrent.Deferred -import cats.effect.IO +import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.config.declaration._ @@ -30,6 +30,7 @@ import org.scalatest.Assertion import scala.concurrent.duration._ import scala.util.Random import scala.concurrent.Future +import java.util.concurrent.Executors trait Fs2RabbitSpec { self: BaseSpec => @@ -37,6 +38,13 @@ trait Fs2RabbitSpec { self: BaseSpec => val emptyAssertion: Assertion = true shouldBe true + def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = + for { + channel <- client.createConnectionChannel + blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + blocker <- blockingES.map(Blocker.liftExecutorService) + } yield (channel, blocker) + it should "create a connection and a queue with default arguments" in withRabbit { interpreter => import interpreter._ @@ -190,122 +198,136 @@ trait Fs2RabbitSpec { self: BaseSpec => it should "create an acker consumer and verify both envelope and ack result" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) - publisher <- createPublisher[String](x, rk) - _ <- publisher("acker-test") - (acker, consumer) <- createAckerConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "acker-test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) + publisher <- createPublisher[String](x, rk, blocker) + _ <- publisher("acker-test") + (acker, consumer) <- createAckerConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "acker-test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } it should "NOT requeue a message in case of NAck when option 'requeueOnNack = false'" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) - publisher <- createPublisher[String](x, rk) - _ <- publisher("NAck-test") - (acker, consumer) <- createAckerConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "NAck-test")) *> - acker(NAck(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) + publisher <- createPublisher[String](x, rk, blocker) + _ <- publisher("NAck-test") + (acker, consumer) <- createAckerConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "NAck-test")) *> + acker(NAck(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } it should "create a publisher, an auto-ack consumer, publish a message and consume it" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk, blocker) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } it should "create an exclusive auto-ack consumer with specific BasicQos" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - ct <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk) - consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + ct <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk, blocker) + consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } it should "create an exclusive acker consumer with specific BasicQos" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - ct <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk) - consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- publisher("test") - _ <- consumer - .take(1) - .evalTap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + ct <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk, blocker) + consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") + _ <- consumer + .take(1) + .evalTap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } @@ -394,30 +416,36 @@ trait Fs2RabbitSpec { self: BaseSpec => it should "bind an exchange to another exchange" in withRabbit { interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, _, rk) <- randomQueueData - sourceExchange <- mkRandomString.map(ExchangeName) - destExchange <- mkRandomString.map(ExchangeName) - consumerTag <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(sourceExchange, ExchangeType.Direct) - _ <- declareExchange(destExchange, ExchangeType.Direct) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, destExchange, rk) - _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) - publisher <- createPublisher[String](sourceExchange, rk) - consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- publisher("test") - _ <- consumer - .take(1) - .evalTap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, sourceExchange, rk, "test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, _, rk) <- randomQueueData + sourceExchange <- mkRandomString.map(ExchangeName) + destExchange <- mkRandomString.map(ExchangeName) + consumerTag <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(sourceExchange, ExchangeType.Direct) + _ <- declareExchange(destExchange, ExchangeType.Direct) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, destExchange, rk) + _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) + publisher <- createPublisher[String](sourceExchange, rk, blocker) + consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") + _ <- consumer + .take(1) + .evalTap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, sourceExchange, rk, "test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } @@ -441,25 +469,27 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - Stream.resource(createConnectionChannel).flatMap { implicit channel => - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) - publisher <- Stream.eval(createPublisher[String](x, rk)) - _ <- Stream.eval(publisher("NAck-test")) - (acker, consumer) <- Stream.eval(createAckerConsumer(q)) - result <- Stream.eval(consumer.take(1).compile.lastOrError) - _ <- Stream.eval(acker(NAck(result.deliveryTag))) - consumer <- Stream.eval(createAutoAckConsumer(q)) - result2 <- consumer.take(1) // Message will be re-queued - } yield { - val expected = expectedDelivery(result.deliveryTag, x, rk, "NAck-test") - - result shouldBe expected - result2 shouldBe expected.copy(deliveryTag = result2.deliveryTag, redelivered = true) - } + Stream.resource(resources(interpreter)).flatMap { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) + publisher <- Stream.eval(createPublisher[String](x, rk, blocker)) + _ <- Stream.eval(publisher("NAck-test")) + (acker, consumer) <- Stream.eval(createAckerConsumer(q)) + result <- Stream.eval(consumer.take(1).compile.lastOrError) + _ <- Stream.eval(acker(NAck(result.deliveryTag))) + consumer <- Stream.eval(createAutoAckConsumer(q)) + result2 <- consumer.take(1) // Message will be re-queued + } yield { + val expected = expectedDelivery(result.deliveryTag, x, rk, "NAck-test") + + result shouldBe expected + result2 shouldBe expected.copy(deliveryTag = result2.deliveryTag, redelivered = true) + } } } @@ -467,34 +497,36 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - diffQ <- mkRandomString.map(QueueName) - _ <- declareExchange(x, ExchangeType.Topic) - publisher <- createPublisher[String](x, rk) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - _ <- declareQueue(DeclarationQueueConfig.default(diffQ)) - _ <- bindQueue(q, x, RoutingKey("diffRK")) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - createAutoAckConsumer(diffQ).flatMap { c2 => - c2.take(1) - .compile - .last - .timeout(1.second) - .attempt - .map(_ shouldBe a[Left[_, _]]) - .as(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + diffQ <- mkRandomString.map(QueueName) + _ <- declareExchange(x, ExchangeType.Topic) + publisher <- createPublisher[String](x, rk, blocker) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + _ <- declareQueue(DeclarationQueueConfig.default(diffQ)) + _ <- bindQueue(q, x, RoutingKey("diffRK")) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + createAutoAckConsumer(diffQ).flatMap { c2 => + c2.take(1) + .compile + .last + .timeout(1.second) + .attempt + .map(_ shouldBe a[Left[_, _]]) + .as(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } } - } - .compile - .drain - } yield emptyAssertion + .compile + .drain + } yield emptyAssertion } } @@ -504,23 +536,26 @@ trait Fs2RabbitSpec { self: BaseSpec => val flag = PublishingFlag(mandatory = true) - Stream.resource(createConnectionChannel).flatMap { implicit channel => - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk)) - promise <- Stream.eval(Deferred[IO, PublishReturn]) - publisher <- Stream.eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise))) - _ <- Stream.eval(publisher("test")) - callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone - consumer <- Stream.eval(createAutoAckConsumer(q)) - result <- takeWithTimeOut(consumer, 500.millis) - } yield { - result shouldBe None - callback.body.value shouldEqual "test".getBytes("UTF-8") - callback.routingKey shouldBe RoutingKey("diff-rk") - } + Stream.resource(resources(interpreter)).flatMap { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk)) + promise <- Stream.eval(Deferred[IO, PublishReturn]) + publisher <- Stream + .eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise), blocker)) + _ <- Stream.eval(publisher("test")) + callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone + consumer <- Stream.eval(createAutoAckConsumer(q)) + result <- takeWithTimeOut(consumer, 500.millis) + } yield { + result shouldBe None + callback.body.value shouldEqual "test".getBytes("UTF-8") + callback.routingKey shouldBe RoutingKey("diff-rk") + } } } @@ -528,23 +563,25 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - createConnectionChannel.use { implicit channel => - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createRoutingPublisher[String](x) - _ <- publisher(rk).apply("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createRoutingPublisher[String](x, blocker) + _ <- publisher(rk).apply("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } @@ -554,23 +591,25 @@ trait Fs2RabbitSpec { self: BaseSpec => val flag = PublishingFlag(mandatory = true) - Stream.resource(createConnectionChannel).flatMap { implicit channel => - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk)) - promise <- Stream.eval(Deferred[IO, PublishReturn]) - publisher <- Stream.eval(createRoutingPublisherWithListener[String](x, flag, listener(promise))) - _ <- Stream.eval(publisher(RoutingKey("diff-rk"))("test")) - callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone - consumer <- Stream.eval(createAutoAckConsumer(q)) - result <- takeWithTimeOut(consumer, 500.millis) - } yield { - result shouldBe None - callback.body.value shouldEqual "test".getBytes("UTF-8") - callback.routingKey shouldBe RoutingKey("diff-rk") - } + Stream.resource(resources(interpreter)).flatMap { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk)) + promise <- Stream.eval(Deferred[IO, PublishReturn]) + publisher <- Stream.eval(createRoutingPublisherWithListener[String](x, flag, listener(promise), blocker)) + _ <- Stream.eval(publisher(RoutingKey("diff-rk"))("test")) + callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone + consumer <- Stream.eval(createAutoAckConsumer(q)) + result <- takeWithTimeOut(consumer, 500.millis) + } yield { + result shouldBe None + callback.body.value shouldEqual "test".getBytes("UTF-8") + callback.routingKey shouldBe RoutingKey("diff-rk") + } } } @@ -591,14 +630,16 @@ trait Fs2RabbitSpec { self: BaseSpec => } def producer(q: QueueName, x: ExchangeName, rk: RoutingKey): IO[Unit] = - createConnectionChannel.use { implicit channel => - for { - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk) - _ <- publisher(message) - } yield () + resources(interpreter).use { + case (channel, blocker) => + implicit val rabbitChannel = channel + for { + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk, blocker) + _ <- publisher(message) + } yield () } randomQueueData From 2f4eeb31937e6f4b6667e33f3afe446acfeee5ca Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Tue, 17 Sep 2019 14:14:23 -0500 Subject: [PATCH 2/4] Blocker is now required to create an Fs2Rabbit client instead of requiring for creating a single publisher --- .../fs2rabbit/algebra/Publishing.scala | 19 +- .../interpreter/AMQPClientEffect.scala | 8 +- .../fs2rabbit/interpreter/Fs2Rabbit.scala | 33 +- .../fs2rabbit/program/PublishingProgram.scala | 27 +- .../examples/AckerConsumerDemo.scala | 37 +- .../examples/AutoAckConsumerDemo.scala | 26 +- .../fs2rabbit/examples/IOAckerConsumer.scala | 18 +- .../examples/MonixAutoAckConsumer.scala | 18 +- .../fs2rabbit/examples/RPCDemo.scala | 31 +- .../examples/ZIOAutoAckConsumer.scala | 17 +- site/src/main/tut/client.md | 24 +- site/src/main/tut/examples/index.md | 2 +- site/src/main/tut/examples/sample-acker.md | 53 +- site/src/main/tut/examples/sample-autoack.md | 48 +- .../tut/examples/sample-mult-connections.md | 16 +- .../tut/examples/sample-mult-consumers.md | 34 +- site/src/main/tut/publishers/index.md | 2 +- .../tut/publishers/publisher-with-listener.md | 16 +- site/src/main/tut/publishers/publisher.md | 18 +- .../fs2rabbit/interpreter/Fs2RabbitSpec.scala | 512 +++++++++--------- 20 files changed, 450 insertions(+), 509 deletions(-) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala index e5e35ef4..eab50152 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Publishing.scala @@ -16,7 +16,6 @@ package dev.profunktor.fs2rabbit.algebra -import cats.effect.Blocker import com.rabbitmq.client.Channel import dev.profunktor.fs2rabbit.effects.MessageEncoder import dev.profunktor.fs2rabbit.model._ @@ -26,8 +25,7 @@ trait Publishing[F[_]] { def createPublisher[A]( channel: Channel, exchangeName: ExchangeName, - routingKey: RoutingKey, - blocker: Blocker + routingKey: RoutingKey )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] def createPublisherWithListener[A]( @@ -35,34 +33,29 @@ trait Publishing[F[_]] { exchangeName: ExchangeName, routingKey: RoutingKey, flags: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] def createRoutingPublisher[A]( channel: Channel, - exchangeName: ExchangeName, - blocker: Blocker + exchangeName: ExchangeName )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] def createRoutingPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, flags: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] def createBasicPublisher[A]( - channel: Channel, - blocker: Blocker + channel: Channel )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] def createBasicPublisherWithListener[A]( channel: Channel, flags: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala index 235a99a0..03cae491 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala @@ -18,7 +18,7 @@ package dev.profunktor.fs2rabbit.interpreter import cats.{Applicative, Functor} import cats.effect.syntax.effect._ -import cats.effect.{Effect, Sync} +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import com.rabbitmq.client._ @@ -31,7 +31,7 @@ import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model._ import scala.util.{Failure, Success, Try} -class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { +class AMQPClientEffect[F[_]: ContextShift: Effect](blocker: Blocker) extends AMQPClient[F] { private[fs2rabbit] def defaultConsumer[A]( channel: Channel, @@ -152,7 +152,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { exchangeName: ExchangeName, routingKey: RoutingKey, msg: AmqpMessage[Array[Byte]] - ): F[Unit] = Sync[F].delay { + ): F[Unit] = blocker.delay { channel.basicPublish( exchangeName.value, routingKey.value, @@ -167,7 +167,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { routingKey: RoutingKey, flag: PublishingFlag, msg: AmqpMessage[Array[Byte]] - ): F[Unit] = Sync[F].delay { + ): F[Unit] = blocker.delay { channel.basicPublish( exchangeName.value, routingKey.value, diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala index ba49e8a2..7172d5e2 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/Fs2Rabbit.scala @@ -30,8 +30,9 @@ import fs2.Stream import javax.net.ssl.SSLContext object Fs2Rabbit { - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: ConcurrentEffect: ContextShift]( config: Fs2RabbitConfig, + blocker: Blocker, sslContext: Option[SSLContext] = None, // Unlike SSLContext, SaslConfig is not optional because it is always set // by the underlying Java library, even if the user doesn't set it. @@ -39,7 +40,7 @@ object Fs2Rabbit { ): F[Fs2Rabbit[F]] = ConnectionEffect.mkConnectionFactory[F](config, sslContext, saslConfig).map { case (factory, addresses) => - val amqpClient = new AmqpClientEffect[F] + val amqpClient = new AMQPClientEffect[F](blocker) val conn = new ConnectionEffect[F](factory, addresses) val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) val acker = new AckingProgram[F](config, amqpClient) @@ -85,50 +86,46 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] ( )(implicit channel: AMQPChannel, decoder: EnvelopeDecoder[F, A]): F[Stream[F, AmqpEnvelope[A]]] = consumingProgram.createAutoAckConsumer(channel.value, queueName, basicQos, consumerArgs) - def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey, blocker: Blocker)( + def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey)( implicit channel: AMQPChannel, encoder: MessageEncoder[F, A] ): F[A => F[Unit]] = - publishingProgram.createPublisher(channel.value, exchangeName, routingKey, blocker) + publishingProgram.createPublisher(channel.value, exchangeName, routingKey) def createPublisherWithListener[A]( exchangeName: ExchangeName, routingKey: RoutingKey, flags: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - publishingProgram.createPublisherWithListener(channel.value, exchangeName, routingKey, flags, listener, blocker) + publishingProgram.createPublisherWithListener(channel.value, exchangeName, routingKey, flags, listener) - def createBasicPublisher[A](blocker: Blocker)( + def createBasicPublisher[A]( implicit channel: AMQPChannel, encoder: MessageEncoder[F, A] ): F[(ExchangeName, RoutingKey, A) => F[Unit]] = - publishingProgram.createBasicPublisher(channel.value, blocker) + publishingProgram.createBasicPublisher(channel.value) def createBasicPublisherWithListener[A]( flag: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )( implicit channel: AMQPChannel, encoder: MessageEncoder[F, A] ): F[(ExchangeName, RoutingKey, A) => F[Unit]] = - publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener, blocker) + publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener) def createRoutingPublisher[A]( - exchangeName: ExchangeName, - blocker: Blocker + exchangeName: ExchangeName )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - publishingProgram.createRoutingPublisher(channel.value, exchangeName, blocker) + publishingProgram.createRoutingPublisher(channel.value, exchangeName) def createRoutingPublisherWithListener[A]( exchangeName: ExchangeName, flags: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - publishingProgram.createRoutingPublisherWithListener(channel.value, exchangeName, flags, listener, blocker) + publishingProgram.createRoutingPublisherWithListener(channel.value, exchangeName, flags, listener) def addPublishingListener(listener: PublishReturn => F[Unit])(implicit channel: AMQPChannel): F[Unit] = amqpClient.addPublishingListener(channel.value, listener) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala index 233d49cc..a8b7b754 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/PublishingProgram.scala @@ -17,7 +17,6 @@ package dev.profunktor.fs2rabbit.program import cats.{Applicative, Monad} -import cats.effect.Blocker import cats.implicits._ import dev.profunktor.fs2rabbit.algebra.{AMQPClient, Publishing} import dev.profunktor.fs2rabbit.effects.MessageEncoder @@ -29,42 +28,37 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F] override def createPublisher[A]( channel: Channel, exchangeName: ExchangeName, - routingKey: RoutingKey, - blocker: Blocker + routingKey: RoutingKey )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - createRoutingPublisher(channel, exchangeName, blocker).map(_.apply(routingKey)) + createRoutingPublisher(channel, exchangeName).map(_.apply(routingKey)) override def createPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, routingKey: RoutingKey, flag: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[A => F[Unit]] = - createRoutingPublisherWithListener(channel, exchangeName, flag, listener, blocker).map(_.apply(routingKey)) + createRoutingPublisherWithListener(channel, exchangeName, flag, listener).map(_.apply(routingKey)) override def createRoutingPublisher[A]( channel: Channel, - exchangeName: ExchangeName, - blocker: Blocker + exchangeName: ExchangeName )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - createBasicPublisher(channel, blocker).map(pub => key => msg => pub(exchangeName, key, msg)) + createBasicPublisher(channel).map(pub => key => msg => pub(exchangeName, key, msg)) override def createRoutingPublisherWithListener[A]( channel: Channel, exchangeName: ExchangeName, flag: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] = - createBasicPublisherWithListener(channel, flag, listener, blocker).map( + createBasicPublisherWithListener(channel, flag, listener).map( pub => key => msg => pub(exchangeName, key, msg) ) override def createBasicPublisher[A]( - channel: Channel, - blocker: Blocker + channel: Channel )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] = Applicative[F].pure { case (exchangeName: ExchangeName, routingKey: RoutingKey, msg: A @unchecked) => @@ -74,8 +68,7 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F] override def createBasicPublisherWithListener[A]( channel: Channel, flag: PublishingFlag, - listener: PublishReturn => F[Unit], - blocker: Blocker + listener: PublishReturn => F[Unit] )(implicit encoder: MessageEncoder[F, A]): F[(ExchangeName, RoutingKey, A) => F[Unit]] = AMQP.addPublishingListener(channel, listener).as { case (exchangeName: ExchangeName, routingKey: RoutingKey, msg: A @unchecked) => diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala index 93e3fabf..68eaacf0 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala @@ -28,7 +28,6 @@ import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2._ -import java.util.concurrent.Executors class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { private val queueName = QueueName("testQ") @@ -47,30 +46,20 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { // Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => F[Unit] = pr => putStrLn(s"Publish listener: $pr") - val resources: Resource[F, (AMQPChannel, Blocker)] = + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => for { - channel <- R.createConnectionChannel - blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - - val program: F[Unit] = resources.use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - (acker, consumer) <- R.createAckerConsumer[String](queueName) - publisher <- R.createPublisherWithListener[AmqpMessage[String]]( - exchangeName, - routingKey, - publishingFlag, - publishingListener, - blocker - ) - _ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain - } yield () + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + (acker, consumer) <- R.createAckerConsumer[String](queueName) + publisher <- R.createPublisherWithListener[AmqpMessage[String]]( + exchangeName, + routingKey, + publishingFlag, + publishingListener + ) + _ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain + } yield () } } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala index c71918d5..3b7f26d3 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala @@ -29,7 +29,6 @@ import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} import io.circe.Encoder -import java.util.concurrent.Executors class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { private val queueName = QueueName("testQ") @@ -42,24 +41,15 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag)) } - val resources: Resource[F, (AMQPChannel, Blocker)] = + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => for { - channel <- R.createConnectionChannel - blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - - val program: F[Unit] = resources.use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey, blocker) - consumer <- R.createAutoAckConsumer[String](queueName) - _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain - } yield () + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + consumer <- R.createAutoAckConsumer[String](queueName) + _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain + } yield () } } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala index 6cc08a85..ed92473f 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala @@ -17,11 +17,12 @@ package dev.profunktor.fs2rabbit.examples import cats.data.NonEmptyList -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object IOAckerConsumer extends IOApp { @@ -42,11 +43,18 @@ object IOAckerConsumer extends IOApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - ResilientStream - .runF(new AckerConsumerDemo[IO](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + ResilientStream + .runF(new AckerConsumerDemo[IO](client).program) + .as(ExitCode.Success) + } } } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala index 56735c1b..8dbd0739 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/MonixAutoAckConsumer.scala @@ -17,12 +17,13 @@ package dev.profunktor.fs2rabbit.examples import cats.data.NonEmptyList -import cats.effect.ExitCode +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream import monix.eval.{Task, TaskApp} +import java.util.concurrent.Executors object MonixAutoAckConsumer extends TaskApp { @@ -43,11 +44,18 @@ object MonixAutoAckConsumer extends TaskApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): Task[ExitCode] = - Fs2Rabbit[Task](config).flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + .as(ExitCode.Success) + } } } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala index 789fbccc..c3b5b513 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala @@ -49,24 +49,27 @@ object RPCDemo extends IOApp { automaticRecovery = true ) - val publishingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { implicit client => - val queue = QueueName("rpc_queue") - publishingES.map(Blocker.liftExecutorService).use { blocker => - runServer[IO](queue, blocker).concurrently(runClient[IO](queue, blocker)).compile.drain.as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { implicit client => + val queue = QueueName("rpc_queue") + runServer[IO](queue).concurrently(runClient[IO](queue)).compile.drain.as(ExitCode.Success) } } - def runServer[F[_]: Sync: LiftIO](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = + def runServer[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = Stream.resource(R.createConnectionChannel).flatMap { implicit channel => - new RPCServer[F](rpcQueue, blocker).serve + new RPCServer[F](rpcQueue).serve } - def runClient[F[_]: Concurrent](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = + def runClient[F[_]: Concurrent](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F]): Stream[F, Unit] = Stream.resource(R.createConnectionChannel).flatMap { implicit channel => - val client = new RPCClient[F](rpcQueue, blocker) + val client = new RPCClient[F](rpcQueue) Stream( client.call("Message 1"), @@ -77,7 +80,7 @@ object RPCDemo extends IOApp { } -class RPCClient[F[_]: Sync](rpcQueue: QueueName, blocker: Blocker)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { +class RPCClient[F[_]: Sync](rpcQueue: QueueName)(implicit R: Fs2Rabbit[F], channel: AMQPChannel) { private val EmptyExchange = ExchangeName("") @@ -90,7 +93,7 @@ class RPCClient[F[_]: Sync](rpcQueue: QueueName, blocker: Blocker)(implicit R: F for { queue <- Stream.eval(R.declareQueue) publisher <- Stream.eval( - R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value), blocker) + R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rpcQueue.value)) ) _ <- Stream.eval(putStrLn(s"[Client] Message $body. ReplyTo queue $queue. Correlation $correlationId")) message = AmqpMessage(body, AmqpProperties(replyTo = Some(queue.value), correlationId = Some(correlationId))) @@ -103,7 +106,7 @@ class RPCClient[F[_]: Sync](rpcQueue: QueueName, blocker: Blocker)(implicit R: F } -class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName, blocker: Blocker)( +class RPCServer[F[_]: Sync](rpcQueue: QueueName)( implicit R: Fs2Rabbit[F], channel: AMQPChannel ) { @@ -126,9 +129,9 @@ class RPCServer[F[_]: Sync: LiftIO](rpcQueue: QueueName, blocker: Blocker)( val replyTo = e.properties.replyTo.toRight(new IllegalArgumentException("ReplyTo parameter is missing")) for { - rk <- IO.fromEither(replyTo).to[F] + rk <- replyTo.liftTo[F] _ <- putStrLn(s"[Server] Received message [${e.payload}]. ReplyTo $rk. CorrelationId $correlationId") - publisher <- R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rk), blocker) + publisher <- R.createPublisher[AmqpMessage[String]](EmptyExchange, RoutingKey(rk)) response = AmqpMessage(s"Response for ${e.payload}", AmqpProperties(correlationId = correlationId)) _ <- publisher(response) } yield () diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala index a1d45028..9a04b897 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/ZIOAutoAckConsumer.scala @@ -19,10 +19,12 @@ package dev.profunktor.fs2rabbit.examples import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import cats.effect.{Blocker, Resource} import zio._ import zio.interop.catz._ import zio.interop.catz.implicits._ import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object ZIOAutoAckConsumer extends CatsApp { @@ -38,11 +40,18 @@ object ZIOAutoAckConsumer extends CatsApp { internalQueueSize = Some(500) ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): UIO[Int] = - Fs2Rabbit[Task](config) - .flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) + blockerResource + .use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + } } .run .map(_ => 0) diff --git a/site/src/main/tut/client.md b/site/src/main/tut/client.md index 838a6e3e..2a6f9edd 100644 --- a/site/src/main/tut/client.md +++ b/site/src/main/tut/client.md @@ -6,18 +6,21 @@ number: 2 # Fs2 Rabbit Client -`Fs2Rabbit` is the main client that wraps the communication with `RabbitMQ`. All it needs are a `Fs2RabbitConfig`, an optional `SSLContext` and an instance of `ConcurrentEffect[F]`. +`Fs2Rabbit` is the main client that wraps the communication with `RabbitMQ`. The mandatory arguments are a `Fs2RabbitConfig` and a `cats.effect.Blocker` used for publishing (this action is blocking in the underlying Java client). Optionally, you can pass in a custom `SSLContext` and `SaslConfig`. ```tut:book:silent import cats.effect._ +import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig} import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import javax.net.ssl.SSLContext object Fs2Rabbit { - def apply[F[_]: ConcurrentEffect]( + def apply[F[_]: ConcurrentEffect: ContextShift]( config: Fs2RabbitConfig, - sslContext: Option[SSLContext] = None + blocker: Blocker, + sslContext: Option[SSLContext] = None, + saslConfig: SaslConfig = DefaultSaslConfig.PLAIN ): F[Fs2Rabbit[F]] = ??? } ``` @@ -25,10 +28,11 @@ object Fs2Rabbit { Its creation is effectful so you need to `flatMap` and pass it as an argument. For example: ```tut:book:silent -import cats.effect.{ExitCode, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit +import java.util.concurrent.Executors object Program { def foo[F[_]](client: Fs2Rabbit[F]): F[Unit] = ??? @@ -48,11 +52,17 @@ class Demo extends IOApp { internalQueueSize = Some(500) ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - Program.foo[IO](client).as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + Program.foo[IO](client).as(ExitCode.Success) + } } } ``` - diff --git a/site/src/main/tut/examples/index.md b/site/src/main/tut/examples/index.md index 91962212..5f370a4b 100644 --- a/site/src/main/tut/examples/index.md +++ b/site/src/main/tut/examples/index.md @@ -7,7 +7,7 @@ position: 2 # Examples -The source code for some of the examples can be found [here](https://github.com/gvolpe/fs2-rabbit/tree/master/examples/src/main/scala/dev.profunktor/fs2rabbit/examples). +The source code for some of the examples can be found [here](https://github.com/profunktor/fs2-rabbit/tree/master/examples/src/main/scala/dev.profunktor/fs2rabbit/examples). ### Simple - **[Single AutoAckConsumer](./sample-autoack.html)**: Example of a single `AutoAckConsumer`, a `Publisher` and `Json` data manipulation. diff --git a/site/src/main/tut/examples/sample-acker.md b/site/src/main/tut/examples/sample-acker.md index e0d29ef9..095d2fee 100644 --- a/site/src/main/tut/examples/sample-acker.md +++ b/site/src/main/tut/examples/sample-acker.md @@ -21,7 +21,6 @@ import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} -import java.util.concurrent.Executors class Flow[F[_]: Concurrent, A]( consumer: Stream[F, AmqpEnvelope[A]], @@ -70,29 +69,19 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) { // Run when there's no consumer for the routing key specified by the publisher and the flag mandatory is true val publishingListener: PublishReturn => F[Unit] = pr => Sync[F].delay(s"Publish listener: $pr") - val resources: Resource[F, (AMQPChannel, Blocker)] = + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => for { - channel <- R.createConnectionChannel - blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - - val program: F[Unit] = resources.use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisherWithListener[AmqpMessage[String]](exchangeName, - routingKey, - publishingFlag, - publishingListener, - blocker) - (acker, consumer) <- R.createAckerConsumer[String](queueName) - result = new Flow[F, String](consumer, acker, logPipe, publisher).flow - _ <- result.compile.drain - } yield () + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisherWithListener[AmqpMessage[String]](exchangeName, + routingKey, + publishingFlag, + publishingListener) + (acker, consumer) <- R.createAckerConsumer[String](queueName) + result = new Flow[F, String](consumer, acker, logPipe, publisher).flow + _ <- result.compile.drain + } yield () } } @@ -102,11 +91,12 @@ At the edge of out program we define our effect, `cats.effect.IO` in this case, ```tut:book:silent import cats.data.NonEmptyList -import cats.effect.{ExitCode, IO, IOApp} +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream +import java.util.concurrent.Executors object IOAckerConsumer extends IOApp { @@ -127,11 +117,18 @@ object IOAckerConsumer extends IOApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): IO[ExitCode] = - Fs2Rabbit[IO](config).flatMap { client => - ResilientStream - .runF(new AckerConsumerDemo[IO](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap { client => + ResilientStream + .runF(new AckerConsumerDemo[IO](client).program) + .as(ExitCode.Success) + } } } diff --git a/site/src/main/tut/examples/sample-autoack.md b/site/src/main/tut/examples/sample-autoack.md index 3149fd94..e4906bd9 100644 --- a/site/src/main/tut/examples/sample-autoack.md +++ b/site/src/main/tut/examples/sample-autoack.md @@ -20,8 +20,7 @@ import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ -import fs2.{Pipe, Pure, Stream} -import java.util.concurrent.Executors +import fs2._ class AutoAckFlow[F[_]: Concurrent, A]( consumer: Stream[F, AmqpEnvelope[A]], @@ -64,25 +63,16 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) { Sync[F].delay(println(s"Consumed: $amqpMsg")).as(Ack(amqpMsg.deliveryTag)) } - val resources: Resource[F, (AMQPChannel, Blocker)] = + val program: F[Unit] = R.createConnectionChannel.use { implicit channel => for { - channel <- R.createConnectionChannel - blockingES = Resource.make(Sync[F].delay(Executors.newCachedThreadPool()))(es => Sync[F].delay(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - - val program: F[Unit] = resources.use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) - _ <- R.declareExchange(exchangeName, ExchangeType.Topic) - _ <- R.bindQueue(queueName, exchangeName, routingKey) - publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey, blocker) - consumer <- R.createAutoAckConsumer[String](queueName) - _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain - } yield () - } + _ <- R.declareQueue(DeclarationQueueConfig.default(queueName)) + _ <- R.declareExchange(exchangeName, ExchangeType.Topic) + _ <- R.bindQueue(queueName, exchangeName, routingKey) + publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey) + consumer <- R.createAutoAckConsumer[String](queueName) + _ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain + } yield () + } } ``` @@ -90,12 +80,13 @@ At the edge of out program we define our effect, `monix.eval.Task` in this case, ```tut:book:silent import cats.data.NonEmptyList -import cats.effect.ExitCode +import cats.effect._ import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.resiliency.ResilientStream import monix.eval.{Task, TaskApp} +import java.util.concurrent.Executors object MonixAutoAckConsumer extends TaskApp { @@ -116,11 +107,18 @@ object MonixAutoAckConsumer extends TaskApp { automaticRecovery = true ) + val blockerResource = + Resource + .make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown())) + .map(Blocker.liftExecutorService) + override def run(args: List[String]): Task[ExitCode] = - Fs2Rabbit[Task](config).flatMap { client => - ResilientStream - .runF(new AutoAckConsumerDemo[Task](client).program) - .as(ExitCode.Success) + blockerResource.use { blocker => + Fs2Rabbit[Task](config, blocker).flatMap { client => + ResilientStream + .runF(new AutoAckConsumerDemo[Task](client).program) + .as(ExitCode.Success) + } } } diff --git a/site/src/main/tut/examples/sample-mult-connections.md b/site/src/main/tut/examples/sample-mult-connections.md index 2c436f04..de98ac30 100644 --- a/site/src/main/tut/examples/sample-mult-connections.md +++ b/site/src/main/tut/examples/sample-mult-connections.md @@ -23,7 +23,6 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2._ -import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -61,19 +60,10 @@ def p2(R: Fs2Rabbit[IO]) = Here's our program `p3` creating a `Publisher` representing the third `Connection`: ```tut:book:silent -def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = - for { - channel <- client.createConnectionChannel - blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - def p3(R: Fs2Rabbit[IO]) = - resources(R).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - R.declareExchange(ex, ExchangeType.Topic) *> - R.createPublisher(ex, rk, blocker) + R.createConnectionChannel use { implicit channel => + R.declareExchange(ex, ExchangeType.Topic) *> + R.createPublisher(ex, rk) } ``` diff --git a/site/src/main/tut/examples/sample-mult-consumers.md b/site/src/main/tut/examples/sample-mult-consumers.md index aabc6674..b8dfc7c1 100644 --- a/site/src/main/tut/examples/sample-mult-consumers.md +++ b/site/src/main/tut/examples/sample-mult-consumers.md @@ -14,7 +14,6 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.model._ import fs2._ -import java.util.concurrent.Executors import scala.concurrent.ExecutionContext @@ -36,28 +35,19 @@ def multipleConsumers(c1: Stream[IO, AmqpEnvelope[String]], c2: Stream[IO, AmqpE ).parJoin(3) } -def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = - for { - channel <- client.createConnectionChannel - blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - def program(R: Fs2Rabbit[IO]) = - resources(R).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- R.declareExchange(ex, ExchangeType.Topic) - _ <- R.declareQueue(DeclarationQueueConfig.default(q1)) - _ <- R.declareQueue(DeclarationQueueConfig.default(q2)) - _ <- R.bindQueue(q1, ex, rka) - _ <- R.bindQueue(q2, ex, rkb) - c1 <- R.createAutoAckConsumer[String](q1) - c2 <- R.createAutoAckConsumer[String](q2) - p <- R.createPublisher[String](ex, rka, blocker) - _ <- multipleConsumers(c1, c2, p).compile.drain - } yield () + R.createConnectionChannel.use { implicit channel => + for { + _ <- R.declareExchange(ex, ExchangeType.Topic) + _ <- R.declareQueue(DeclarationQueueConfig.default(q1)) + _ <- R.declareQueue(DeclarationQueueConfig.default(q2)) + _ <- R.bindQueue(q1, ex, rka) + _ <- R.bindQueue(q2, ex, rkb) + c1 <- R.createAutoAckConsumer[String](q1) + c2 <- R.createAutoAckConsumer[String](q2) + p <- R.createPublisher[String](ex, rka) + _ <- multipleConsumers(c1, c2, p).compile.drain + } yield () } ``` diff --git a/site/src/main/tut/publishers/index.md b/site/src/main/tut/publishers/index.md index 82d9f3d6..e86061ed 100644 --- a/site/src/main/tut/publishers/index.md +++ b/site/src/main/tut/publishers/index.md @@ -6,7 +6,7 @@ number: 10 # Publishers -Publishing are blocking actions in the underlying Java client so you need to pass in a `cats.effect.Blocker` when creating a publisher. The examples demonstrate how to create one. +Publishing are blocking actions in the underlying Java client so you need to pass in a `cats.effect.Blocker` when creating the `Fs2Rabbit` client. - **[Publisher](./publisher.html)**: A simple message publisher. - **[Publisher with Listener](./publisher-with-listener.html)**: A publisher with a listener for messages that can not be routed. diff --git a/site/src/main/tut/publishers/publisher-with-listener.md b/site/src/main/tut/publishers/publisher-with-listener.md index 8ded8800..e90ab4ee 100644 --- a/site/src/main/tut/publishers/publisher-with-listener.md +++ b/site/src/main/tut/publishers/publisher-with-listener.md @@ -24,7 +24,6 @@ It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit -import java.util.concurrent.Executors val exchangeName = ExchangeName("testEX") val routingKey = RoutingKey("testRK") @@ -35,18 +34,9 @@ val publishingListener: PublishReturn => IO[Unit] = pr => IO(println(s"Publish l def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit -def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = - for { - channel <- client.createConnectionChannel - blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - -def program(client: Fs2Rabbit[IO]) = - resources(client).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - client.createPublisherWithListener[String](exchangeName, routingKey, publishingFlag, publishingListener, blocker).flatMap(doSomething) +def program(R: Fs2Rabbit[IO]) = + R.createConnectionChannel.use { implicit channel => + R.createPublisherWithListener[String](exchangeName, routingKey, publishingFlag, publishingListener).flatMap(doSomething) } ``` diff --git a/site/src/main/tut/publishers/publisher.md b/site/src/main/tut/publishers/publisher.md index c0bf1a89..54515248 100644 --- a/site/src/main/tut/publishers/publisher.md +++ b/site/src/main/tut/publishers/publisher.md @@ -6,31 +6,21 @@ number: 11 # Publisher -A `Publisher` is simply created by specifying an `ExchangeName` and a `RoutingKey`. In addition, you need to pass in a `cats.effect.Blocker` since publishing are blocking actions in the underlying Java client: +A `Publisher` is simply created by specifying an `ExchangeName` and a `RoutingKey`: ```tut:book:silent import cats.effect._ import dev.profunktor.fs2rabbit.model._ import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit -import java.util.concurrent.Executors val exchangeName = ExchangeName("testEX") val routingKey = RoutingKey("testRK") def doSomething(publisher: String => IO[Unit]): IO[Unit] = IO.unit -def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = - for { - channel <- client.createConnectionChannel - blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) - -def program(client: Fs2Rabbit[IO]) = - resources(client).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - client.createPublisher[String](exchangeName, routingKey, blocker).flatMap(doSomething) +def program(R: Fs2Rabbit[IO]) = + R.createConnectionChannel.use { implicit channel => + R.createPublisher[String](exchangeName, routingKey).flatMap(doSomething) } ``` diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index 668b83b1..a01d2247 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -38,12 +38,10 @@ trait Fs2RabbitSpec { self: BaseSpec => val emptyAssertion: Assertion = true shouldBe true - def resources(client: Fs2Rabbit[IO]): Resource[IO, (AMQPChannel, Blocker)] = - for { - channel <- client.createConnectionChannel - blockingES = Resource.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) - blocker <- blockingES.map(Blocker.liftExecutorService) - } yield (channel, blocker) + val blockerResource = + Resource + .make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown())) + .map(Blocker.liftExecutorService) it should "create a connection and a queue with default arguments" in withRabbit { interpreter => import interpreter._ @@ -198,136 +196,126 @@ trait Fs2RabbitSpec { self: BaseSpec => it should "create an acker consumer and verify both envelope and ack result" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) - publisher <- createPublisher[String](x, rk, blocker) - _ <- publisher("acker-test") - (acker, consumer) <- createAckerConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "acker-test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) + publisher <- createPublisher[String](x, rk) + _ <- publisher("acker-test") + (acker, consumer) <- createAckerConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "acker-test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } it should "NOT requeue a message in case of NAck when option 'requeueOnNack = false'" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) - publisher <- createPublisher[String](x, rk, blocker) - _ <- publisher("NAck-test") - (acker, consumer) <- createAckerConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "NAck-test")) *> - acker(NAck(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty)) + publisher <- createPublisher[String](x, rk) + _ <- publisher("NAck-test") + (acker, consumer) <- createAckerConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "NAck-test")) *> + acker(NAck(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } it should "create a publisher, an auto-ack consumer, publish a message and consume it" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk, blocker) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } it should "create an exclusive auto-ack consumer with specific BasicQos" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - ct <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk, blocker) - consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + ct <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk) + consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q, BasicQos(prefetchSize = 0, prefetchCount = 10), Some(consumerArgs)) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } it should "create an exclusive acker consumer with specific BasicQos" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - ct <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk, blocker) - consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer( - q, - BasicQos(prefetchSize = 0, prefetchCount = 10), - Some(consumerArgs) - ) - _ <- publisher("test") - _ <- consumer - .take(1) - .evalTap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + ct <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk) + consumerArgs = ConsumerArgs(consumerTag = ct, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") + _ <- consumer + .take(1) + .evalTap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } @@ -416,36 +404,34 @@ trait Fs2RabbitSpec { self: BaseSpec => it should "bind an exchange to another exchange" in withRabbit { interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, _, rk) <- randomQueueData - sourceExchange <- mkRandomString.map(ExchangeName) - destExchange <- mkRandomString.map(ExchangeName) - consumerTag <- mkRandomString.map(ConsumerTag) - _ <- declareExchange(sourceExchange, ExchangeType.Direct) - _ <- declareExchange(destExchange, ExchangeType.Direct) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, destExchange, rk) - _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) - publisher <- createPublisher[String](sourceExchange, rk, blocker) - consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) - (acker, consumer) <- createAckerConsumer( - q, - BasicQos(prefetchSize = 0, prefetchCount = 10), - Some(consumerArgs) - ) - _ <- publisher("test") - _ <- consumer - .take(1) - .evalTap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, sourceExchange, rk, "test")) *> - acker(Ack(msg.deliveryTag)) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, _, rk) <- randomQueueData + sourceExchange <- mkRandomString.map(ExchangeName) + destExchange <- mkRandomString.map(ExchangeName) + consumerTag <- mkRandomString.map(ConsumerTag) + _ <- declareExchange(sourceExchange, ExchangeType.Direct) + _ <- declareExchange(destExchange, ExchangeType.Direct) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, destExchange, rk) + _ <- bindExchange(destExchange, sourceExchange, rk, ExchangeBindingArgs(Map.empty)) + publisher <- createPublisher[String](sourceExchange, rk) + consumerArgs = ConsumerArgs(consumerTag = consumerTag, noLocal = false, exclusive = true, args = Map.empty) + (acker, consumer) <- createAckerConsumer( + q, + BasicQos(prefetchSize = 0, prefetchCount = 10), + Some(consumerArgs) + ) + _ <- publisher("test") + _ <- consumer + .take(1) + .evalTap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, sourceExchange, rk, "test")) *> + acker(Ack(msg.deliveryTag)) + } + .compile + .drain + } yield emptyAssertion } } @@ -469,27 +455,25 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - Stream.resource(resources(interpreter)).flatMap { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) - publisher <- Stream.eval(createPublisher[String](x, rk, blocker)) - _ <- Stream.eval(publisher("NAck-test")) - (acker, consumer) <- Stream.eval(createAckerConsumer(q)) - result <- Stream.eval(consumer.take(1).compile.lastOrError) - _ <- Stream.eval(acker(NAck(result.deliveryTag))) - consumer <- Stream.eval(createAutoAckConsumer(q)) - result2 <- consumer.take(1) // Message will be re-queued - } yield { - val expected = expectedDelivery(result.deliveryTag, x, rk, "NAck-test") - - result shouldBe expected - result2 shouldBe expected.copy(deliveryTag = result2.deliveryTag, redelivered = true) - } + Stream.resource(createConnectionChannel).flatMap { implicit channel => + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk, QueueBindingArgs(Map.empty))) + publisher <- Stream.eval(createPublisher[String](x, rk)) + _ <- Stream.eval(publisher("NAck-test")) + (acker, consumer) <- Stream.eval(createAckerConsumer(q)) + result <- Stream.eval(consumer.take(1).compile.lastOrError) + _ <- Stream.eval(acker(NAck(result.deliveryTag))) + consumer <- Stream.eval(createAutoAckConsumer(q)) + result2 <- consumer.take(1) // Message will be re-queued + } yield { + val expected = expectedDelivery(result.deliveryTag, x, rk, "NAck-test") + + result shouldBe expected + result2 shouldBe expected.copy(deliveryTag = result2.deliveryTag, redelivered = true) + } } } @@ -497,36 +481,34 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - diffQ <- mkRandomString.map(QueueName) - _ <- declareExchange(x, ExchangeType.Topic) - publisher <- createPublisher[String](x, rk, blocker) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - _ <- declareQueue(DeclarationQueueConfig.default(diffQ)) - _ <- bindQueue(q, x, RoutingKey("diffRK")) - _ <- publisher("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - createAutoAckConsumer(diffQ).flatMap { c2 => - c2.take(1) - .compile - .last - .timeout(1.second) - .attempt - .map(_ shouldBe a[Left[_, _]]) - .as(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + diffQ <- mkRandomString.map(QueueName) + _ <- declareExchange(x, ExchangeType.Topic) + publisher <- createPublisher[String](x, rk) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + _ <- declareQueue(DeclarationQueueConfig.default(diffQ)) + _ <- bindQueue(q, x, RoutingKey("diffRK")) + _ <- publisher("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + createAutoAckConsumer(diffQ).flatMap { c2 => + c2.take(1) + .compile + .last + .timeout(1.second) + .attempt + .map(_ shouldBe a[Left[_, _]]) + .as(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) } - .compile - .drain - } yield emptyAssertion + } + .compile + .drain + } yield emptyAssertion } } @@ -536,26 +518,24 @@ trait Fs2RabbitSpec { self: BaseSpec => val flag = PublishingFlag(mandatory = true) - Stream.resource(resources(interpreter)).flatMap { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk)) - promise <- Stream.eval(Deferred[IO, PublishReturn]) - publisher <- Stream - .eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise), blocker)) - _ <- Stream.eval(publisher("test")) - callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone - consumer <- Stream.eval(createAutoAckConsumer(q)) - result <- takeWithTimeOut(consumer, 500.millis) - } yield { - result shouldBe None - callback.body.value shouldEqual "test".getBytes("UTF-8") - callback.routingKey shouldBe RoutingKey("diff-rk") - } + Stream.resource(createConnectionChannel).flatMap { implicit channel => + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk)) + promise <- Stream.eval(Deferred[IO, PublishReturn]) + publisher <- Stream + .eval(createPublisherWithListener(x, RoutingKey("diff-rk"), flag, listener(promise))) + _ <- Stream.eval(publisher("test")) + callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone + consumer <- Stream.eval(createAutoAckConsumer(q)) + result <- takeWithTimeOut(consumer, 500.millis) + } yield { + result shouldBe None + callback.body.value shouldEqual "test".getBytes("UTF-8") + callback.routingKey shouldBe RoutingKey("diff-rk") + } } } @@ -563,25 +543,23 @@ trait Fs2RabbitSpec { self: BaseSpec => interpreter => import interpreter._ - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- randomQueueData - _ <- declareExchange(x, ExchangeType.Topic) - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- bindQueue(q, x, rk) - publisher <- createRoutingPublisher[String](x, blocker) - _ <- publisher(rk).apply("test") - consumer <- createAutoAckConsumer(q) - _ <- consumer - .take(1) - .evalMap { msg => - IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) - } - .compile - .drain - } yield emptyAssertion + createConnectionChannel.use { implicit channel => + for { + (q, x, rk) <- randomQueueData + _ <- declareExchange(x, ExchangeType.Topic) + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- bindQueue(q, x, rk) + publisher <- createRoutingPublisher[String](x) + _ <- publisher(rk).apply("test") + consumer <- createAutoAckConsumer(q) + _ <- consumer + .take(1) + .evalMap { msg => + IO(msg shouldBe expectedDelivery(msg.deliveryTag, x, rk, "test")) + } + .compile + .drain + } yield emptyAssertion } } @@ -591,25 +569,23 @@ trait Fs2RabbitSpec { self: BaseSpec => val flag = PublishingFlag(mandatory = true) - Stream.resource(resources(interpreter)).flatMap { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - (q, x, rk) <- Stream.eval(randomQueueData) - _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) - _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) - _ <- Stream.eval(bindQueue(q, x, rk)) - promise <- Stream.eval(Deferred[IO, PublishReturn]) - publisher <- Stream.eval(createRoutingPublisherWithListener[String](x, flag, listener(promise), blocker)) - _ <- Stream.eval(publisher(RoutingKey("diff-rk"))("test")) - callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone - consumer <- Stream.eval(createAutoAckConsumer(q)) - result <- takeWithTimeOut(consumer, 500.millis) - } yield { - result shouldBe None - callback.body.value shouldEqual "test".getBytes("UTF-8") - callback.routingKey shouldBe RoutingKey("diff-rk") - } + Stream.resource(createConnectionChannel).flatMap { implicit channel => + for { + (q, x, rk) <- Stream.eval(randomQueueData) + _ <- Stream.eval(declareExchange(x, ExchangeType.Topic)) + _ <- Stream.eval(declareQueue(DeclarationQueueConfig.default(q))) + _ <- Stream.eval(bindQueue(q, x, rk)) + promise <- Stream.eval(Deferred[IO, PublishReturn]) + publisher <- Stream.eval(createRoutingPublisherWithListener[String](x, flag, listener(promise))) + _ <- Stream.eval(publisher(RoutingKey("diff-rk"))("test")) + callback <- Stream.eval(promise.get.map(_.some).timeoutTo(500.millis, IO.pure(none[PublishReturn]))).unNone + consumer <- Stream.eval(createAutoAckConsumer(q)) + result <- takeWithTimeOut(consumer, 500.millis) + } yield { + result shouldBe None + callback.body.value shouldEqual "test".getBytes("UTF-8") + callback.routingKey shouldBe RoutingKey("diff-rk") + } } } @@ -630,16 +606,14 @@ trait Fs2RabbitSpec { self: BaseSpec => } def producer(q: QueueName, x: ExchangeName, rk: RoutingKey): IO[Unit] = - resources(interpreter).use { - case (channel, blocker) => - implicit val rabbitChannel = channel - for { - _ <- declareQueue(DeclarationQueueConfig.default(q)) - _ <- declareExchange(x, ExchangeType.Topic) - _ <- bindQueue(q, x, rk) - publisher <- createPublisher[String](x, rk, blocker) - _ <- publisher(message) - } yield () + createConnectionChannel.use { implicit channel => + for { + _ <- declareQueue(DeclarationQueueConfig.default(q)) + _ <- declareExchange(x, ExchangeType.Topic) + _ <- bindQueue(q, x, rk) + publisher <- createPublisher[String](x, rk) + _ <- publisher(message) + } yield () } randomQueueData @@ -655,13 +629,25 @@ trait Fs2RabbitSpec { self: BaseSpec => } private def withStreamRabbit[A](fa: Fs2Rabbit[IO] => Stream[IO, A]): Future[Assertion] = - Fs2Rabbit[IO](config).flatMap(r => fa(r).compile.drain).as(emptyAssertion).unsafeToFuture + blockerResource + .use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap(r => fa(r).compile.drain) + } + .as(emptyAssertion) + .unsafeToFuture private def withStreamNackRabbit[A](fa: Fs2Rabbit[IO] => Stream[IO, A]): Future[Assertion] = - Fs2Rabbit[IO](config.copy(requeueOnNack = true)).flatMap(r => fa(r).compile.drain).as(emptyAssertion).unsafeToFuture + blockerResource + .use { blocker => + Fs2Rabbit[IO](config.copy(requeueOnNack = true), blocker).flatMap(r => fa(r).compile.drain) + } + .as(emptyAssertion) + .unsafeToFuture private def withRabbit[A](fa: Fs2Rabbit[IO] => IO[A]): Future[A] = - Fs2Rabbit[IO](config).flatMap(r => fa(r)).unsafeToFuture + blockerResource.use { blocker => + Fs2Rabbit[IO](config, blocker).flatMap(r => fa(r)) + }.unsafeToFuture private def randomQueueData: IO[(QueueName, ExchangeName, RoutingKey)] = (mkRandomString, mkRandomString, mkRandomString).mapN { From 32f9dce8b246972380fabceaaf4551deca880818 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Tue, 17 Sep 2019 14:22:40 -0500 Subject: [PATCH 3/4] Updating docs --- site/src/main/tut/publishers/publisher-with-listener.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/site/src/main/tut/publishers/publisher-with-listener.md b/site/src/main/tut/publishers/publisher-with-listener.md index e90ab4ee..86cd6b33 100644 --- a/site/src/main/tut/publishers/publisher-with-listener.md +++ b/site/src/main/tut/publishers/publisher-with-listener.md @@ -18,7 +18,7 @@ The server SHOULD implement the mandatory flag. ### Creating a Publisher with Listener -It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag` and a listener, i.e. a function from `PublishReturn` to `F[Unit]`. In addition, you need to pass in a `cats.effect.Blocker` since publishing are blocking actions in the underlying Java client: +It is simply created by specifying `ExchangeName`, `RoutingKey`, `PublishingFlag` and a listener, i.e. a function from `PublishReturn` to `F[Unit]`: ```tut:book:silent import cats.effect._ From 9def3622f33099b553afbb92eb38f66e81aa0307 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Tue, 17 Sep 2019 16:13:17 -0500 Subject: [PATCH 4/4] Update site/src/main/tut/publishers/index.md Co-Authored-By: Gavin Bisesi --- site/src/main/tut/publishers/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/site/src/main/tut/publishers/index.md b/site/src/main/tut/publishers/index.md index e86061ed..fb59da6a 100644 --- a/site/src/main/tut/publishers/index.md +++ b/site/src/main/tut/publishers/index.md @@ -6,7 +6,7 @@ number: 10 # Publishers -Publishing are blocking actions in the underlying Java client so you need to pass in a `cats.effect.Blocker` when creating the `Fs2Rabbit` client. +Publishing are blocking actions in the underlying Java client so you need to pass in a [`cats.effect.Blocker`](https://typelevel.org/cats-effect/concurrency/basics.html#choosing-thread-pool) when creating the `Fs2Rabbit` client. - **[Publisher](./publisher.html)**: A simple message publisher. - **[Publisher with Listener](./publisher-with-listener.html)**: A publisher with a listener for messages that can not be routed.