Skip to content

Commit

Permalink
Merge pull request #244 from profunktor/feature/publishing-blocker
Browse files Browse the repository at this point in the history
Creating an Fs2Rabbit client requires a Blocker
  • Loading branch information
gvolpe authored Sep 17, 2019
2 parents 0be2ffa + 9def362 commit 96d2f0e
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package dev.profunktor.fs2rabbit.algebra

import com.rabbitmq.client.Channel
import dev.profunktor.fs2rabbit.effects.MessageEncoder
import dev.profunktor.fs2rabbit.model._
import com.rabbitmq.client.Channel

trait Publishing[F[_]] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package dev.profunktor.fs2rabbit.interpreter

import cats.{Applicative, Functor}
import cats.effect.syntax.effect._
import cats.effect.{Effect, Sync}
import cats.effect._
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.rabbitmq.client._
Expand All @@ -31,7 +31,7 @@ import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
import dev.profunktor.fs2rabbit.model._
import scala.util.{Failure, Success, Try}

class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
class AMQPClientEffect[F[_]: ContextShift: Effect](blocker: Blocker) extends AMQPClient[F] {

private[fs2rabbit] def defaultConsumer[A](
channel: Channel,
Expand Down Expand Up @@ -152,7 +152,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
exchangeName: ExchangeName,
routingKey: RoutingKey,
msg: AmqpMessage[Array[Byte]]
): F[Unit] = Sync[F].delay {
): F[Unit] = blocker.delay {
channel.basicPublish(
exchangeName.value,
routingKey.value,
Expand All @@ -167,7 +167,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
routingKey: RoutingKey,
flag: PublishingFlag,
msg: AmqpMessage[Array[Byte]]
): F[Unit] = Sync[F].delay {
): F[Unit] = blocker.delay {
channel.basicPublish(
exchangeName.value,
routingKey.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package dev.profunktor.fs2rabbit.interpreter

import cats.effect.{Concurrent, ConcurrentEffect, Resource}
import cats.effect._
import cats.syntax.functor._
import com.rabbitmq.client.{DefaultSaslConfig, SaslConfig}
import dev.profunktor.fs2rabbit.algebra._
Expand All @@ -30,16 +30,17 @@ import fs2.Stream
import javax.net.ssl.SSLContext

object Fs2Rabbit {
def apply[F[_]: ConcurrentEffect](
def apply[F[_]: ConcurrentEffect: ContextShift](
config: Fs2RabbitConfig,
blocker: Blocker,
sslContext: Option[SSLContext] = None,
// Unlike SSLContext, SaslConfig is not optional because it is always set
// by the underlying Java library, even if the user doesn't set it.
saslConfig: SaslConfig = DefaultSaslConfig.PLAIN
): F[Fs2Rabbit[F]] =
ConnectionEffect.mkConnectionFactory[F](config, sslContext, saslConfig).map {
case (factory, addresses) =>
val amqpClient = new AmqpClientEffect[F]
val amqpClient = new AMQPClientEffect[F](blocker)
val conn = new ConnectionEffect[F](factory, addresses)
val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500))
val acker = new AckingProgram[F](config, amqpClient)
Expand Down Expand Up @@ -72,8 +73,10 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] (
queueName: QueueName,
basicQos: BasicQos = BasicQos(prefetchSize = 0, prefetchCount = 1),
consumerArgs: Option[ConsumerArgs] = None
)(implicit channel: AMQPChannel,
decoder: EnvelopeDecoder[F, A]): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] =
)(
implicit channel: AMQPChannel,
decoder: EnvelopeDecoder[F, A]
): F[(AckResult => F[Unit], Stream[F, AmqpEnvelope[A]])] =
consumingProgram.createAckerConsumer(channel.value, queueName, basicQos, consumerArgs)

def createAutoAckConsumer[A](
Expand All @@ -85,7 +88,8 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] (

def createPublisher[A](exchangeName: ExchangeName, routingKey: RoutingKey)(
implicit channel: AMQPChannel,
encoder: MessageEncoder[F, A]): F[A => F[Unit]] =
encoder: MessageEncoder[F, A]
): F[A => F[Unit]] =
publishingProgram.createPublisher(channel.value, exchangeName, routingKey)

def createPublisherWithListener[A](
Expand All @@ -111,9 +115,9 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] (
): F[(ExchangeName, RoutingKey, A) => F[Unit]] =
publishingProgram.createBasicPublisherWithListener(channel.value, flag, listener)

def createRoutingPublisher[A](exchangeName: ExchangeName)(
implicit channel: AMQPChannel,
encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] =
def createRoutingPublisher[A](
exchangeName: ExchangeName
)(implicit channel: AMQPChannel, encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] =
publishingProgram.createRoutingPublisher(channel.value, exchangeName)

def createRoutingPublisherWithListener[A](
Expand All @@ -133,31 +137,38 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] (
amqpClient.basicCancel(channel.value, consumerTag)

def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey)

def bindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.bindQueue(channel.value, queueName, exchangeName, routingKey, args)

def bindQueueNoWait(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.bindQueueNoWait(channel.value, queueName, exchangeName, routingKey, args)

def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
unbindQueue(queueName, exchangeName, routingKey, QueueUnbindArgs(Map.empty))

def unbindQueue(queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueUnbindArgs)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.unbindQueue(channel.value, queueName, exchangeName, routingKey, args)

def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeBindingArgs)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.bindExchange(channel.value, destination, source, routingKey, args)

def bindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
bindExchange(destination, source, routingKey, ExchangeBindingArgs(Map.empty))

def bindExchangeNoWait(
Expand All @@ -169,11 +180,13 @@ class Fs2Rabbit[F[_]: Concurrent] private[fs2rabbit] (
amqpClient.bindExchangeNoWait(channel.value, destination, source, routingKey, args)

def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey, args: ExchangeUnbindArgs)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
amqpClient.unbindExchange(channel.value, destination, source, routingKey, args)

def unbindExchange(destination: ExchangeName, source: ExchangeName, routingKey: RoutingKey)(
implicit channel: AMQPChannel): F[Unit] =
implicit channel: AMQPChannel
): F[Unit] =
unbindExchange(destination, source, routingKey, ExchangeUnbindArgs(Map.empty))

def declareExchange(exchangeName: ExchangeName, exchangeType: ExchangeType)(implicit channel: AMQPChannel): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class PublishingProgram[F[_]: Monad](AMQP: AMQPClient[F]) extends Publishing[F]
flag: PublishingFlag,
listener: PublishReturn => F[Unit]
)(implicit encoder: MessageEncoder[F, A]): F[RoutingKey => A => F[Unit]] =
createBasicPublisherWithListener(channel, flag, listener).map(pub => key => msg => pub(exchangeName, key, msg))
createBasicPublisherWithListener(channel, flag, listener).map(
pub => key => msg => pub(exchangeName, key, msg)
)

override def createBasicPublisher[A](
channel: Channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package dev.profunktor.fs2rabbit.examples
import java.nio.charset.StandardCharsets.UTF_8

import cats.data.Kleisli
import cats.effect.{Concurrent, Timer}
import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig
import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit
import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder
import dev.profunktor.fs2rabbit.model.AckResult.Ack
import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal}
import dev.profunktor.fs2rabbit.model._
import fs2.{Pipe, Pure, Stream}
import fs2._

class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) {
private val queueName = QueueName("testQ")
Expand Down Expand Up @@ -61,6 +61,7 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](R: Fs2Rabbit[F]) {
_ <- new Flow[F, String](consumer, acker, logPipe, publisher).flow.compile.drain
} yield ()
}

}

class Flow[F[_]: Concurrent, A](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package dev.profunktor.fs2rabbit.examples
import java.nio.charset.StandardCharsets.UTF_8

import cats.data.Kleisli
import cats.effect.Concurrent
import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig
import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit
Expand All @@ -41,18 +41,17 @@ class AutoAckConsumerDemo[F[_]: Concurrent](R: Fs2Rabbit[F]) {
putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag))
}

val program: F[Unit] = {
R.createConnectionChannel use { implicit channel =>
for {
_ <- R.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- R.declareExchange(exchangeName, ExchangeType.Topic)
_ <- R.bindQueue(queueName, exchangeName, routingKey)
publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
consumer <- R.createAutoAckConsumer[String](queueName)
_ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain
} yield ()
}
val program: F[Unit] = R.createConnectionChannel.use { implicit channel =>
for {
_ <- R.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- R.declareExchange(exchangeName, ExchangeType.Topic)
_ <- R.bindQueue(queueName, exchangeName, routingKey)
publisher <- R.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
consumer <- R.createAutoAckConsumer[String](queueName)
_ <- new AutoAckFlow[F, String](consumer, logPipe, publisher).flow.compile.drain
} yield ()
}

}

class AutoAckFlow[F[_]: Concurrent, A](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package dev.profunktor.fs2rabbit.examples

import cats.data.NonEmptyList
import cats.effect.{ExitCode, IO, IOApp}
import cats.effect._
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit
import dev.profunktor.fs2rabbit.resiliency.ResilientStream
import java.util.concurrent.Executors

object IOAckerConsumer extends IOApp {

Expand All @@ -42,11 +43,18 @@ object IOAckerConsumer extends IOApp {
automaticRecovery = true
)

val blockerResource =
Resource
.make(IO(Executors.newCachedThreadPool()))(es => IO(es.shutdown()))
.map(Blocker.liftExecutorService)

override def run(args: List[String]): IO[ExitCode] =
Fs2Rabbit[IO](config).flatMap { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
.as(ExitCode.Success)
blockerResource.use { blocker =>
Fs2Rabbit[IO](config, blocker).flatMap { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
.as(ExitCode.Success)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package dev.profunktor.fs2rabbit.examples

import cats.data.NonEmptyList
import cats.effect.ExitCode
import cats.effect._
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit
import dev.profunktor.fs2rabbit.resiliency.ResilientStream
import monix.eval.{Task, TaskApp}
import java.util.concurrent.Executors

object MonixAutoAckConsumer extends TaskApp {

Expand All @@ -43,11 +44,18 @@ object MonixAutoAckConsumer extends TaskApp {
automaticRecovery = true
)

val blockerResource =
Resource
.make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown()))
.map(Blocker.liftExecutorService)

override def run(args: List[String]): Task[ExitCode] =
Fs2Rabbit[Task](config).flatMap { client =>
ResilientStream
.runF(new AutoAckConsumerDemo[Task](client).program)
.as(ExitCode.Success)
blockerResource.use { blocker =>
Fs2Rabbit[Task](config, blocker).flatMap { client =>
ResilientStream
.runF(new AutoAckConsumerDemo[Task](client).program)
.as(ExitCode.Success)
}
}

}
Loading

0 comments on commit 96d2f0e

Please sign in to comment.