Skip to content

Commit

Permalink
Add headers type (#956)
Browse files Browse the repository at this point in the history
Add Headers type
  • Loading branch information
geirolz authored Sep 6, 2024
1 parent 11024d0 commit c8a1c57
Show file tree
Hide file tree
Showing 36 changed files with 2,127 additions and 780 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.{Applicative, Functor}
import com.rabbitmq.client.{AMQP, Consumer, DefaultConsumer, Envelope, ShutdownSignalException}
import dev.profunktor.fs2rabbit.arguments.{Arguments, _}
import dev.profunktor.fs2rabbit.arguments._
import dev.profunktor.fs2rabbit.model._

import scala.util.{Failure, Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package dev.profunktor.fs2rabbit.config

import java.util.concurrent.TimeUnit

import cats.data.NonEmptyList
import com.rabbitmq.client.ConnectionFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

case class Fs2RabbitNodeConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/

package dev.profunktor.fs2rabbit.effects
import cats.{Applicative, ApplicativeError, ApplicativeThrow, MonadError}
import cats.data.Kleisli
import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, ExchangeName, RoutingKey}
import dev.profunktor.fs2rabbit.model.AmqpFieldValue._
import cats.implicits._
import cats.{Applicative, ApplicativeError, ApplicativeThrow}
import dev.profunktor.fs2rabbit.model.codec.AmqpFieldDecoder
import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, ExchangeName, HeaderKey, Headers, RoutingKey}

object EnvelopeDecoder extends EnvelopeDecoderInstances {

Expand All @@ -40,47 +40,53 @@ object EnvelopeDecoder extends EnvelopeDecoderInstances {
def redelivered[F[_]: Applicative]: EnvelopeDecoder[F, Boolean] =
Kleisli(e => e.redelivered.pure[F])

def header[F[_]](name: String)(implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, AmqpFieldValue] =
Kleisli(e => F.catchNonFatal(e.properties.headers(name)))
// header
def headers[F[_]: ApplicativeThrow]: EnvelopeDecoder[F, Headers] =
Kleisli(_.properties.headers.pure[F])

def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpFieldValue]] =
Kleisli(_.properties.headers.get(name).pure[F])
def header[F[_]: ApplicativeThrow](key: String): EnvelopeDecoder[F, AmqpFieldValue] =
Kleisli(_.properties.headers.get[F](key))

def headerAs[F[_]: ApplicativeThrow, T: AmqpFieldDecoder](key: HeaderKey): EnvelopeDecoder[F, T] =
Kleisli(_.properties.headers.getAs[F, T](key))

def optHeader[F[_]: Applicative](key: HeaderKey): EnvelopeDecoder[F, Option[AmqpFieldValue]] =
Kleisli(_.properties.headers.getOpt(key).pure[F])

def optHeaderAs[F[_]: ApplicativeThrow, T: AmqpFieldDecoder](key: HeaderKey): EnvelopeDecoder[F, Option[T]] =
Kleisli(_.properties.headers.getOptAsF[F, T](key))

@deprecated("Use headerAs[F, String] instead", "5.3.0")
def stringHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, String] =
headerPF[F, String](name) { case StringVal(a) => a }
headerAs[F, String](name)

@deprecated("Use headerAs[F, Int] instead", "5.3.0")
def intHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Int] =
headerPF[F, Int](name) { case IntVal(a) => a }
headerAs[F, Int](name)

@deprecated("Use headerAs[F, Long] instead", "5.3.0")
def longHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Long] =
headerPF[F, Long](name) { case LongVal(a) => a }

def arrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, collection.Seq[Any]] =
headerPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a }
headerAs[F, Long](name)

@deprecated("Use optHeaderAs[F, String] instead", "5.3.0")
def optStringHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[String]] =
optHeaderPF[F, String](name) { case StringVal(a) => a }
optHeaderAs[F, String](name)

@deprecated("Use optHeaderAs[F, Int] instead", "5.3.0")
def optIntHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[Int]] =
optHeaderPF[F, Int](name) { case IntVal(a) => a }
optHeaderAs[F, Int](name)

@deprecated("Use optHeaderAs[F, Long] instead", "5.3.0")
def optLongHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[Long]] =
optHeaderPF[F, Long](name) { case LongVal(a) => a }
optHeaderAs[F, Long](name)

def optArrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] =
optHeaderPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a }

private def headerPF[F[_], A](
name: String
)(pf: PartialFunction[AmqpFieldValue, A])(implicit F: ApplicativeThrow[F]): EnvelopeDecoder[F, A] =
Kleisli { env =>
F.catchNonFatal(pf(env.properties.headers(name)))
}
@deprecated("Use headerAs[F, collection.Seq[Any]] instead", "5.3.0")
def arrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, collection.Seq[Any]] =
headerAs[F, collection.Seq[Any]](name)(ApplicativeThrow[F], AmqpFieldDecoder.collectionSeqDecoder[Any])

private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])(implicit
F: ApplicativeThrow[F]
): EnvelopeDecoder[F, Option[A]] =
Kleisli(_.properties.headers.get(name).traverse(h => F.catchNonFatal(pf(h))))
@deprecated("Use optHeaderAs[F, collection.Seq[Any]] instead", "5.3.0")
def optArrayHeader[F[_]: ApplicativeThrow](name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] =
optHeaderAs[F, collection.Seq[Any]](name)(ApplicativeThrow[F], AmqpFieldDecoder.collectionSeqDecoder[Any])
}

sealed trait EnvelopeDecoderInstances {
Expand Down
Loading

0 comments on commit c8a1c57

Please sign in to comment.