Skip to content

Commit

Permalink
Merge branch 'master' into allow-custom-connection-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
geirolz authored Oct 26, 2024
2 parents 44982bb + 8283b0b commit 97c1fc1
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 17 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import Dependencies._
import microsites.ExtraMdFileConfig

ThisBuild / name := "fs2-rabbit"
ThisBuild / scalaVersion := "2.13.14"
ThisBuild / crossScalaVersions := List("2.12.20", "2.13.14", "3.3.3", "3.5.1")
ThisBuild / scalaVersion := "2.13.15"
ThisBuild / crossScalaVersions := List("2.12.20", "2.13.15", "3.3.4", "3.5.2")
ThisBuild / versionScheme := Some("semver-spec")
ThisBuild / organization := "dev.profunktor"
ThisBuild / homepage := Some(url("https://fs2-rabbit.profunktor.dev/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package dev.profunktor.fs2rabbit.config

import dev.profunktor.fs2rabbit.arguments.Arguments
import dev.profunktor.fs2rabbit.arguments.{Arguments, SafeArg}
import dev.profunktor.fs2rabbit.model.{ExchangeName, ExchangeType, QueueName, QueueType}

object declaration {

// ----- Queue Config -----
final case class DeclarationQueueConfig(
queueName: QueueName,
durable: DurableCfg,
Expand All @@ -43,6 +44,35 @@ object declaration {
case None =>
Right(arguments)
}

// arguments
def withArguments(arguments: Arguments): DeclarationQueueConfig =
copy(arguments = arguments)

def withArguments(arguments: (String, SafeArg)*): DeclarationQueueConfig =
withArguments(arguments.toMap)

// durable
def withDurable: DeclarationQueueConfig =
copy(durable = Durable)

def withNonDurable: DeclarationQueueConfig =
copy(durable = NonDurable)

// autoDelete
def withAutoDelete: DeclarationQueueConfig =
copy(autoDelete = AutoDelete)

def withNonAutoDelete: DeclarationQueueConfig =
copy(autoDelete = NonAutoDelete)

// exclusive
def withExclusive: DeclarationQueueConfig =
copy(exclusive = Exclusive)

def withNonExclusive: DeclarationQueueConfig =
copy(exclusive = NonExclusive)

}
object DeclarationQueueConfig {

Expand Down Expand Up @@ -78,19 +108,56 @@ object declaration {
case object AutoDelete extends AutoDeleteCfg
case object NonAutoDelete extends AutoDeleteCfg

// ----- Exchange Config -----
final case class DeclarationExchangeConfig(
exchangeName: ExchangeName,
exchangeType: ExchangeType,
durable: DurableCfg,
autoDelete: AutoDeleteCfg,
internal: InternalCfg,
arguments: Arguments
)
) {

// arguments
def withArguments(arguments: Arguments): DeclarationExchangeConfig =
copy(arguments = arguments)

def withArguments(arguments: (String, SafeArg)*): DeclarationExchangeConfig =
withArguments(arguments.toMap)

// durable
def withDurable: DeclarationExchangeConfig =
copy(durable = Durable)

def withNonDurable: DeclarationExchangeConfig =
copy(durable = NonDurable)

// autoDelete
def withAutoDelete: DeclarationExchangeConfig =
copy(autoDelete = AutoDelete)

def withNonAutoDelete: DeclarationExchangeConfig =
copy(autoDelete = NonAutoDelete)

// internal
def withInternal: DeclarationExchangeConfig =
copy(internal = Internal)

def withNonInternal: DeclarationExchangeConfig =
copy(internal = NonInternal)
}

object DeclarationExchangeConfig {

def default(exchangeName: ExchangeName, exchangeType: ExchangeType): DeclarationExchangeConfig =
DeclarationExchangeConfig(exchangeName, exchangeType, NonDurable, NonAutoDelete, NonInternal, Map.empty)
DeclarationExchangeConfig(
exchangeName = exchangeName,
exchangeType = exchangeType,
durable = NonDurable,
autoDelete = NonAutoDelete,
internal = NonInternal,
arguments = Map.empty
)
}

sealed trait InternalCfg extends Product with Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,33 @@ object deletion {
)

object DeletionQueueConfig {

@deprecated("Use ifUnusedAndEmpty instead", "5.3.0")
def default(queueName: QueueName): DeletionQueueConfig =
onlyIfUnusedAndEmpty(queueName)

def onlyIfUnusedAndEmpty(queueName: QueueName): DeletionQueueConfig =
DeletionQueueConfig(queueName, Unused, Empty)

def evenIfUsedButEmpty(queueName: QueueName): DeletionQueueConfig =
DeletionQueueConfig(queueName, Used, Empty)

def evenIfUsedAndNonEmpty(queueName: QueueName): DeletionQueueConfig =
DeletionQueueConfig(queueName, Used, NonEmpty)
}

final case class DeletionExchangeConfig(exchangeName: ExchangeName, ifUnused: IfUnusedCfg)

object DeletionExchangeConfig {

@deprecated("Use ifUnused instead", "5.3.0")
def default(exchangeName: ExchangeName): DeletionExchangeConfig =
onlyIfUnused(exchangeName)

def onlyIfUnused(exchangeName: ExchangeName): DeletionExchangeConfig =
DeletionExchangeConfig(exchangeName, Unused)

def evenIfUsed(exchangeName: ExchangeName): DeletionExchangeConfig =
DeletionExchangeConfig(exchangeName, Used)
}

sealed trait IfEmptyCfg extends Product with Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object AmqpProperties {
contentType = Option(basicProps.getContentType),
contentEncoding = Option(basicProps.getContentEncoding),
priority = Option[Integer](basicProps.getPriority).map(Int.unbox),
deliveryMode = Option(basicProps.getDeliveryMode).map(DeliveryMode.from(_)),
deliveryMode = Option(basicProps.getDeliveryMode).map(DeliveryMode.unsafeFromInt(_)),
correlationId = Option(basicProps.getCorrelationId),
messageId = Option(basicProps.getMessageId),
`type` = Option(basicProps.getType),
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/dev/profunktor/fs2rabbit/model/values.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,18 @@ object DeliveryMode {
case object NonPersistent extends DeliveryMode(1)
case object Persistent extends DeliveryMode(2)

def from(value: Int): DeliveryMode = value match {
case 1 => NonPersistent
case 2 => Persistent
@deprecated("Use fromInt or unsafeFromInt", "5.3.0")
def from(value: Int): DeliveryMode =
unsafeFromInt(value)

def unsafeFromInt(value: Int): DeliveryMode =
fromInt(value)
.getOrElse(throw new IllegalArgumentException(s"Invalid delivery mode from Int: $value"))

def fromInt(value: Int): Option[DeliveryMode] = value match {
case 1 => Some(NonPersistent)
case 2 => Some(Persistent)
case _ => None
}

implicit val deliveryModeOrder: Order[DeliveryMode] = Order.by(_.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object AmqpPropertiesArbs {
contentType,
contentEncoding,
priority,
deliveryMode.map(DeliveryMode.from),
deliveryMode.map(DeliveryMode.unsafeFromInt),
correlationId,
messageId,
messageType,
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val fs2 = "3.11.0"
val circe = "0.14.10"
val amqpClient = "5.22.0"
val logback = "1.5.8"
val logback = "1.5.11"
val monix = "3.3.0"
val zio = "1.0.18"
val zioCats = "3.2.9.1"
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.2
sbt.version=1.10.3
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
resolvers += Classpaths.sbtPluginReleases
resolvers += "Typesafe Repository" at "https://repo.typesafe.com/typesafe/releases/"

addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.9.0")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "1.4.4")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.6.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ trait Fs2RabbitSpec { self: BaseSpec =>
(q, x, _) = qxrk
_ <- declareExchange(x, ExchangeType.Direct)
_ <- declareQueue(DeclarationQueueConfig.default(q))
_ <- deleteQueue(DeletionQueueConfig.default(q))
_ <- deleteQueueNoWait(DeletionQueueConfig.default(q))
_ <- deleteQueue(DeletionQueueConfig.onlyIfUnusedAndEmpty(q))
_ <- deleteQueueNoWait(DeletionQueueConfig.onlyIfUnusedAndEmpty(q))
consumer <- createAutoAckConsumer(q)
_ <- consumer.attempt
.take(1)
Expand All @@ -415,7 +415,7 @@ trait Fs2RabbitSpec { self: BaseSpec =>
createConnectionChannel.use { implicit channel =>
mkRandomString.map(ExchangeName).flatMap { exchange =>
declareExchange(exchange, ExchangeType.Direct) *>
deleteExchangeNoWait(DeletionExchangeConfig.default(exchange)).attempt.map(_ shouldBe a[Right[_, _]])
deleteExchangeNoWait(DeletionExchangeConfig.onlyIfUnused(exchange)).attempt.map(_ shouldBe a[Right[_, _]])
}
}
}
Expand Down

0 comments on commit 97c1fc1

Please sign in to comment.