From a013411801045555c8fbe7516b99973485550aae Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Fri, 12 Jul 2019 16:38:52 -0500 Subject: [PATCH] Polishing code for new release --- .../fs2rabbit/config/declaration.scala | 14 ++-- .../profunktor/fs2rabbit/effects/Log.scala | 8 +- .../interpreter/AMQPClientEffect.scala | 77 ++++++++++--------- .../dev/profunktor/fs2rabbit/model.scala | 14 ++-- .../resiliency/ResilientStream.scala | 2 +- .../dev/profunktor/fs2rabbit/BaseSpec.scala | 4 +- 6 files changed, 62 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala index a14bb722..48011edd 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/config/declaration.scala @@ -46,12 +46,14 @@ object declaration { case object AutoDelete extends AutoDeleteCfg case object NonAutoDelete extends AutoDeleteCfg - final case class DeclarationExchangeConfig(exchangeName: ExchangeName, - exchangeType: ExchangeType, - durable: DurableCfg, - autoDelete: AutoDeleteCfg, - internal: InternalCfg, - arguments: Arguments) + final case class DeclarationExchangeConfig( + exchangeName: ExchangeName, + exchangeType: ExchangeType, + durable: DurableCfg, + autoDelete: AutoDeleteCfg, + internal: InternalCfg, + arguments: Arguments + ) object DeclarationExchangeConfig { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/Log.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/Log.scala index c4365a8a..70c97b76 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/Log.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/Log.scala @@ -20,8 +20,8 @@ import cats.effect.Sync import org.slf4j.LoggerFactory trait Log[F[_]] { - def info(value: String): F[Unit] - def error(error: Throwable): F[Unit] + def info(value: => String): F[Unit] + def error(value: => String): F[Unit] } object Log { @@ -31,7 +31,7 @@ object Log { implicit def syncLogInstance[F[_]](implicit F: Sync[F]): Log[F] = new Log[F] { - override def error(error: Throwable): F[Unit] = F.delay(logger.error(error.getMessage, error)) - override def info(value: String): F[Unit] = F.delay(logger.info(value)) + override def error(value: => String): F[Unit] = F.delay(logger.error(value)) + override def info(value: => String): F[Unit] = F.delay(logger.info(value)) } } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala index 4bc49ac9..235a99a0 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.interpreter -import cats.Applicative +import cats.{Applicative, Functor} import cats.effect.syntax.effect._ import cats.effect.{Effect, Sync} import cats.syntax.flatMap._ @@ -29,6 +29,7 @@ import dev.profunktor.fs2rabbit.config.deletion import dev.profunktor.fs2rabbit.config.deletion.DeletionQueueConfig import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model._ +import scala.util.{Failure, Success, Try} class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { @@ -52,44 +53,42 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { properties: AMQP.BasicProperties, body: Array[Byte] ): Unit = { - // This should not go wrong (if it does it is an indication of a bug in - // unsafeFrom)! - // However, I'm not entirely confident I've nailed down unsafeFrom ( - // since it requires a pretty intricate understanding of the underlying - // Java library) so just in case, we're wrapping it in a Try so that a - // bug here doesn't bring down our entire queue. - val amqpPropertiesOrErr = scala.util.Try(AmqpProperties.unsafeFrom(properties)) match { - // toEither is not supported by Scala 2.11 so we have a manual match - case scala.util.Success(amqpProperties) => Right(amqpProperties) - case scala.util.Failure(err) => - val rewrappedError = new Exception( - "You've stumbled across a bug in the interface between the underlying " + - "RabbitMQ Java library and fs2-rabbit! Please report this bug and " + - "include this stack trace and message.\nThe BasicProperties instance " + - s"that caused this error was:\n$properties\n", + def rewrappedError(err: Throwable) = + Left( + new Exception( + s""" + You've stumbled across a bug in the interface between the underlying + RabbitMQ Java library and fs2-rabbit! Please report this bug and + include this stack trace and message.\n + + The BasicProperties instance that caused this error was:\n + + $properties + """, err ) - Left(rewrappedError) - } - // Calling the Functor instance manually is because of three annoying things: - // 1. Scala 2.11 doesn't have right-biased Either so .map doesn't work, - // 2. Scala 2.13 deprecates .right so .right.map doesn't work either - // (since we have fatal warnings). - // 3. import cats.implicits._ doesn't work because it warns about an - // unused import for Scala 2.12 and Scala 2.13 - // So we invoke the Either Functor instance manually - import cats.instances.either.catsStdInstancesForEither - - val envelopeOrErr = catsStdInstancesForEither.map(amqpPropertiesOrErr) { props => - val tag = envelope.getDeliveryTag - val routingKey = RoutingKey(envelope.getRoutingKey) - val exchange = ExchangeName(envelope.getExchange) - val redelivered = envelope.isRedeliver + ) + + val amqpPropertiesOrErr = + Try(AmqpProperties.unsafeFrom(properties)) match { + case Success(p) => Right(p) + case Failure(t) => rewrappedError(t) + } + + val tag = envelope.getDeliveryTag + val routingKey = RoutingKey(envelope.getRoutingKey) + val exchange = ExchangeName(envelope.getExchange) + val redelivered = envelope.isRedeliver + + // Calling the Functor instance manually for compatibility + import cats.instances.either._ + + val envelopeOrErr = Functor[Either[Throwable, ?]].map(amqpPropertiesOrErr) { props => AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered) } internals.queue - .fold(Applicative[F].pure(())) { internalQ => + .fold(Applicative[F].unit) { internalQ => internalQ.enqueue1(envelopeOrErr) } .toIO @@ -224,11 +223,13 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { channel.queueBind(queueName.value, exchangeName.value, routingKey.value) }.void - override def bindQueue(channel: Channel, - queueName: QueueName, - exchangeName: ExchangeName, - routingKey: RoutingKey, - args: QueueBindingArgs): F[Unit] = + override def bindQueue( + channel: Channel, + queueName: QueueName, + exchangeName: ExchangeName, + routingKey: RoutingKey, + args: QueueBindingArgs + ): F[Unit] = Sync[F].delay { channel.queueBind(queueName.value, exchangeName.value, routingKey.value, args.value) }.void diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala index 50b6c486..5eb95950 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala @@ -364,12 +364,14 @@ object model { } } - case class AmqpEnvelope[A](deliveryTag: DeliveryTag, - payload: A, - properties: AmqpProperties, - exchangeName: ExchangeName, - routingKey: RoutingKey, - redelivered: Boolean) + case class AmqpEnvelope[A]( + deliveryTag: DeliveryTag, + payload: A, + properties: AmqpProperties, + exchangeName: ExchangeName, + routingKey: RoutingKey, + redelivered: Boolean + ) case class AmqpMessage[A](payload: A, properties: AmqpProperties) object AmqpEnvelope { diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/resiliency/ResilientStream.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/resiliency/ResilientStream.scala index 20ed4364..1161191f 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/resiliency/ResilientStream.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/resiliency/ResilientStream.scala @@ -53,7 +53,7 @@ object ResilientStream { ): Stream[F, Unit] = program.handleErrorWith { case NonFatal(err) => - Stream.eval(Log[F].error(err) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >> + Stream.eval(Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >> loop[F](Stream.sleep(retry) >> program, retry, count + 1) } diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/BaseSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/BaseSpec.scala index b22c7ab2..120f0d85 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/BaseSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/BaseSpec.scala @@ -28,7 +28,7 @@ trait BaseSpec extends AsyncFlatSpecLike with Matchers with EitherValues { implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) implicit val logger: Log[IO] = new Log[IO] { - override def info(value: String): IO[Unit] = putStrLn(value) - override def error(error: Throwable): IO[Unit] = putStrLn(error.getMessage) + override def info(value: => String): IO[Unit] = putStrLn(value) + override def error(error: => String): IO[Unit] = putStrLn(error) } }