Skip to content

Commit

Permalink
Polishing code for new release
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe committed Jul 12, 2019
1 parent e156915 commit a013411
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ object declaration {
case object AutoDelete extends AutoDeleteCfg
case object NonAutoDelete extends AutoDeleteCfg

final case class DeclarationExchangeConfig(exchangeName: ExchangeName,
exchangeType: ExchangeType,
durable: DurableCfg,
autoDelete: AutoDeleteCfg,
internal: InternalCfg,
arguments: Arguments)
final case class DeclarationExchangeConfig(
exchangeName: ExchangeName,
exchangeType: ExchangeType,
durable: DurableCfg,
autoDelete: AutoDeleteCfg,
internal: InternalCfg,
arguments: Arguments
)

object DeclarationExchangeConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import cats.effect.Sync
import org.slf4j.LoggerFactory

trait Log[F[_]] {
def info(value: String): F[Unit]
def error(error: Throwable): F[Unit]
def info(value: => String): F[Unit]
def error(value: => String): F[Unit]
}

object Log {
Expand All @@ -31,7 +31,7 @@ object Log {

implicit def syncLogInstance[F[_]](implicit F: Sync[F]): Log[F] =
new Log[F] {
override def error(error: Throwable): F[Unit] = F.delay(logger.error(error.getMessage, error))
override def info(value: String): F[Unit] = F.delay(logger.info(value))
override def error(value: => String): F[Unit] = F.delay(logger.error(value))
override def info(value: => String): F[Unit] = F.delay(logger.info(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package dev.profunktor.fs2rabbit.interpreter

import cats.Applicative
import cats.{Applicative, Functor}
import cats.effect.syntax.effect._
import cats.effect.{Effect, Sync}
import cats.syntax.flatMap._
Expand All @@ -29,6 +29,7 @@ import dev.profunktor.fs2rabbit.config.deletion
import dev.profunktor.fs2rabbit.config.deletion.DeletionQueueConfig
import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
import dev.profunktor.fs2rabbit.model._
import scala.util.{Failure, Success, Try}

class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {

Expand All @@ -52,44 +53,42 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
properties: AMQP.BasicProperties,
body: Array[Byte]
): Unit = {
// This should not go wrong (if it does it is an indication of a bug in
// unsafeFrom)!
// However, I'm not entirely confident I've nailed down unsafeFrom (
// since it requires a pretty intricate understanding of the underlying
// Java library) so just in case, we're wrapping it in a Try so that a
// bug here doesn't bring down our entire queue.
val amqpPropertiesOrErr = scala.util.Try(AmqpProperties.unsafeFrom(properties)) match {
// toEither is not supported by Scala 2.11 so we have a manual match
case scala.util.Success(amqpProperties) => Right(amqpProperties)
case scala.util.Failure(err) =>
val rewrappedError = new Exception(
"You've stumbled across a bug in the interface between the underlying " +
"RabbitMQ Java library and fs2-rabbit! Please report this bug and " +
"include this stack trace and message.\nThe BasicProperties instance " +
s"that caused this error was:\n$properties\n",
def rewrappedError(err: Throwable) =
Left(
new Exception(
s"""
You've stumbled across a bug in the interface between the underlying
RabbitMQ Java library and fs2-rabbit! Please report this bug and
include this stack trace and message.\n

The BasicProperties instance that caused this error was:\n

$properties
""",
err
)
Left(rewrappedError)
}
// Calling the Functor instance manually is because of three annoying things:
// 1. Scala 2.11 doesn't have right-biased Either so .map doesn't work,
// 2. Scala 2.13 deprecates .right so .right.map doesn't work either
// (since we have fatal warnings).
// 3. import cats.implicits._ doesn't work because it warns about an
// unused import for Scala 2.12 and Scala 2.13
// So we invoke the Either Functor instance manually
import cats.instances.either.catsStdInstancesForEither

val envelopeOrErr = catsStdInstancesForEither.map(amqpPropertiesOrErr) { props =>
val tag = envelope.getDeliveryTag
val routingKey = RoutingKey(envelope.getRoutingKey)
val exchange = ExchangeName(envelope.getExchange)
val redelivered = envelope.isRedeliver
)

val amqpPropertiesOrErr =
Try(AmqpProperties.unsafeFrom(properties)) match {
case Success(p) => Right(p)
case Failure(t) => rewrappedError(t)
}

val tag = envelope.getDeliveryTag
val routingKey = RoutingKey(envelope.getRoutingKey)
val exchange = ExchangeName(envelope.getExchange)
val redelivered = envelope.isRedeliver

// Calling the Functor instance manually for compatibility
import cats.instances.either._

val envelopeOrErr = Functor[Either[Throwable, ?]].map(amqpPropertiesOrErr) { props =>
AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered)
}

internals.queue
.fold(Applicative[F].pure(())) { internalQ =>
.fold(Applicative[F].unit) { internalQ =>
internalQ.enqueue1(envelopeOrErr)
}
.toIO
Expand Down Expand Up @@ -224,11 +223,13 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
channel.queueBind(queueName.value, exchangeName.value, routingKey.value)
}.void

override def bindQueue(channel: Channel,
queueName: QueueName,
exchangeName: ExchangeName,
routingKey: RoutingKey,
args: QueueBindingArgs): F[Unit] =
override def bindQueue(
channel: Channel,
queueName: QueueName,
exchangeName: ExchangeName,
routingKey: RoutingKey,
args: QueueBindingArgs
): F[Unit] =
Sync[F].delay {
channel.queueBind(queueName.value, exchangeName.value, routingKey.value, args.value)
}.void
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/dev/profunktor/fs2rabbit/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,14 @@ object model {
}
}

case class AmqpEnvelope[A](deliveryTag: DeliveryTag,
payload: A,
properties: AmqpProperties,
exchangeName: ExchangeName,
routingKey: RoutingKey,
redelivered: Boolean)
case class AmqpEnvelope[A](
deliveryTag: DeliveryTag,
payload: A,
properties: AmqpProperties,
exchangeName: ExchangeName,
routingKey: RoutingKey,
redelivered: Boolean
)
case class AmqpMessage[A](payload: A, properties: AmqpProperties)

object AmqpEnvelope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object ResilientStream {
): Stream[F, Unit] =
program.handleErrorWith {
case NonFatal(err) =>
Stream.eval(Log[F].error(err) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >>
Stream.eval(Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >>
loop[F](Stream.sleep(retry) >> program, retry, count + 1)
}

Expand Down
4 changes: 2 additions & 2 deletions tests/src/test/scala/dev/profunktor/fs2rabbit/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait BaseSpec extends AsyncFlatSpecLike with Matchers with EitherValues {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

implicit val logger: Log[IO] = new Log[IO] {
override def info(value: String): IO[Unit] = putStrLn(value)
override def error(error: Throwable): IO[Unit] = putStrLn(error.getMessage)
override def info(value: => String): IO[Unit] = putStrLn(value)
override def error(error: => String): IO[Unit] = putStrLn(error)
}
}

0 comments on commit a013411

Please sign in to comment.