Skip to content

Commit

Permalink
Fix mdoc
Browse files Browse the repository at this point in the history
  • Loading branch information
geirolz committed Oct 11, 2024
1 parent 7fd0803 commit 44982bb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 51 deletions.
20 changes: 14 additions & 6 deletions site/docs/config.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
---
layout: docs
title: "Configuration"
title: "Configuration"
number: 1
---

# Configuration

The main `RabbitMQ` configuration should be defined as `Fs2RabbitConfig`. You choose how to get the information, either from an `application.conf` file, from the environment or provided by an external system. A popular option that fits well the tech stack is [Pure Config](https://pureconfig.github.io/).
The main `RabbitMQ` configuration should be defined as `Fs2RabbitConfig`. You choose how to get the information, either
from an `application.conf` file, from the environment or provided by an external system. A popular option that fits well
the tech stack is [Pure Config](https://pureconfig.github.io/).

```scala mdoc:silent
import cats.data.NonEmptyList
Expand All @@ -32,10 +34,16 @@ val config = Fs2RabbitConfig(
requestedHeartbeat = 30.seconds,
automaticRecovery = true,
automaticTopologyRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
maxInboundMessageBodySizeBytes = Fs2RabbitConfig.defaultMaxInboundMessageBodySizeBytes,
clientProvidedConnectionName = Some("app:rabbit"),
connectionFactory = None
)
```

The `internalQueueSize` indicates the size of the fs2's bounded queue used internally to communicate with the AMQP Java driver.
The `automaticRecovery` indicates whether the AMQP Java driver should try to [recover broken connections](https://www.rabbitmq.com/api-guide.html#recovery).
The `requestedHeartbeat` indicates [heartbeat timeout](https://www.rabbitmq.com/heartbeats.html#using-heartbeats-in-java). Should be non-zero and lower than 60.
The `internalQueueSize` indicates the size of the fs2's bounded queue used internally to communicate with the AMQP Java
driver.
The `automaticRecovery` indicates whether the AMQP Java driver should try
to [recover broken connections](https://www.rabbitmq.com/api-guide.html#recovery).
The `requestedHeartbeat`
indicates [heartbeat timeout](https://www.rabbitmq.com/heartbeats.html#using-heartbeats-in-java). Should be non-zero and
lower than 60.
49 changes: 27 additions & 22 deletions site/docs/examples/client-metrics.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
---
layout: docs
title: "Client Metrics"
title: "Client Metrics"
number: 18
---

# Client Metrics

RabbitMQ Java Client supports metrics collection via [Dropwizard](https://www.rabbitmq.com/blog/2016/11/30/metrics-support-in-rabbitmq-java-client-4-0/) or [Micrometer](https://www.rabbitmq.com/blog/2018/04/10/rabbitmq-java-client-metrics-with-micrometer-and-datadog/).
RabbitMQ Java Client supports metrics collection
via [Dropwizard](https://www.rabbitmq.com/blog/2016/11/30/metrics-support-in-rabbitmq-java-client-4-0/)
or [Micrometer](https://www.rabbitmq.com/blog/2018/04/10/rabbitmq-java-client-metrics-with-micrometer-and-datadog/).
At the moment of writing both providers are in the `amqp-client` 5.9.0. You can instantiate one as shown below.

```scala
val registry = new MetricRegistry
val registry = new MetricRegistry
val dropwizardCollector = new StandardMetricsCollector(registry)
```

Expand All @@ -23,19 +25,19 @@ RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resou
## Expose via JMX

JMX provides a standard way to access performance metrics of an application. Dropwizard has a module to report metrics
via JMX with `metrics-jmx` module. Please add it to the list of the dependencies.
via JMX with `metrics-jmx` module. Please add it to the list of the dependencies.

```
libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.1.5"
libraryDependencies += "io.dropwizard.metrics" % "metrics-jmx" % "4.1.5"
```

It provides `JmxReporter` for the metrics registry. It is a resource. It can be wrapped with acquire-release pattern for
ease to use.
ease to use.

```scala
object JmxReporterResource {
def make[F[_]: Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
def make[F[_] : Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
val acquire = Sync[F].delay {
val reporter = JmxReporter.forRegistry(registry).inDomain("com.rabbitmq.client.jmx").build
reporter.start()
Expand All @@ -53,18 +55,19 @@ Let's initialise the FS2 RabbitMQ client and AMQP channel with metrics.

```scala
val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)

val program = resources.use {
case (channel, client) =>
// Let's publish and consume and see the counters go up
// Let's publish and consume and see the counters go up
}
```

The app is going to have now the following metrics under `com.rabbitmq.client.jmx`:

* Acknowledged messages
* Channels count
* Connections count
Expand Down Expand Up @@ -111,12 +114,14 @@ object DropwizardMetricsDemo extends IOApp {
requestedHeartbeat = 60.seconds,
automaticRecovery = true,
automaticTopologyRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
maxInboundMessageBodySizeBytes = Fs2RabbitConfig.defaultMaxInboundMessageBodySizeBytes,
clientProvidedConnectionName = Some("app:rabbit"),
connectionFactory = None
)

private val queueName = QueueName("testQ")
private val queueName = QueueName("testQ")
private val exchangeName = ExchangeName("testEX")
private val routingKey = RoutingKey("testRK")
private val routingKey = RoutingKey("testRK")

val simpleMessage = AmqpMessage("Hey!", AmqpProperties.empty)

Expand All @@ -126,12 +131,12 @@ object DropwizardMetricsDemo extends IOApp {
}

override def run(args: List[String]): IO[ExitCode] = {
val registry = new MetricRegistry
val registry = new MetricRegistry
val dropwizardCollector = new StandardMetricsCollector(registry)

val resources = for {
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
_ <- JmxReporterResource.make[IO](registry)
client <- RabbitClient.default[IO](config).withMetricsCollector(dropwizardCollector).resource
channel <- client.createConnection.flatMap(client.createChannel)
} yield (channel, client)

Expand All @@ -141,12 +146,12 @@ object DropwizardMetricsDemo extends IOApp {

val setup =
for {
_ <- client.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- client.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic))
_ <- client.bindQueue(queueName, exchangeName, routingKey)
ackerConsumer <- client.createAckerConsumer[String](queueName)
_ <- client.declareQueue(DeclarationQueueConfig.default(queueName))
_ <- client.declareExchange(DeclarationExchangeConfig.default(exchangeName, Topic))
_ <- client.bindQueue(queueName, exchangeName, routingKey)
ackerConsumer <- client.createAckerConsumer[String](queueName)
(acker, consumer) = ackerConsumer
publisher <- client.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
publisher <- client.createPublisher[AmqpMessage[String]](exchangeName, routingKey)
} yield (consumer, acker, publisher)

Stream
Expand All @@ -165,15 +170,15 @@ object DropwizardMetricsDemo extends IOApp {
program.as(ExitCode.Success)
}

def logPipe[F[_]: Sync]: Pipe[F, AmqpEnvelope[String], AckResult] =
def logPipe[F[_] : Sync]: Pipe[F, AmqpEnvelope[String], AckResult] =
_.evalMap { amqpMsg =>
putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag))
}

}

object JmxReporterResource {
def make[F[_]: Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
def make[F[_] : Sync](registry: MetricRegistry): Resource[F, JmxReporter] = {
val acquire = Sync[F].delay {
val reporter = JmxReporter.forRegistry(registry).inDomain("com.rabbitmq.client.jmx").build
reporter.start()
Expand Down
51 changes: 28 additions & 23 deletions site/docs/examples/sample-acker.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
---
layout: docs
title: "Single AckerConsumer"
title: "Single AckerConsumer"
number: 16
---

# Single AckerConsumer

Here we create a single `AckerConsumer`, a single `Publisher` and finally we publish two messages: a simple `String` message and a `Json` message by using the `fs2-rabbit-circe` extension.
Here we create a single `AckerConsumer`, a single `Publisher` and finally we publish two messages: a simple `String`
message and a `Json` message by using the `fs2-rabbit-circe` extension.

```scala mdoc:silent
import java.nio.charset.StandardCharsets.UTF_8
Expand All @@ -22,19 +23,21 @@ import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal}
import dev.profunktor.fs2rabbit.model._
import fs2.{Pipe, Pure, Stream}

class Flow[F[_]: Concurrent, A](
consumer: Stream[F, AmqpEnvelope[A]],
acker: AckResult => F[Unit],
logger: Pipe[F, AmqpEnvelope[A], AckResult],
publisher: AmqpMessage[String] => F[Unit]
) {
class Flow[F[_] : Concurrent, A](
consumer: Stream[F, AmqpEnvelope[A]],
acker: AckResult => F[Unit],
logger: Pipe[F, AmqpEnvelope[A], AckResult],
publisher: AmqpMessage[String] => F[Unit]
) {

import io.circe.generic.auto._

case class Address(number: Int, streetName: String)

case class Person(id: Long, name: String, address: Address)

private val jsonEncoder = new Fs2JsonEncoder

import jsonEncoder.jsonEncode

val jsonPipe: Pipe[Pure, AmqpMessage[Person], AmqpMessage[String]] = _.map(jsonEncode[Person])
Expand All @@ -52,11 +55,11 @@ class Flow[F[_]: Concurrent, A](

}

class AckerConsumerDemo[F[_]: Async](R: RabbitClient[F]) {
class AckerConsumerDemo[F[_] : Async](R: RabbitClient[F]) {

private val queueName = QueueName("testQ")
private val queueName = QueueName("testQ")
private val exchangeName = ExchangeName("testEX")
private val routingKey = RoutingKey("testRK")
private val routingKey = RoutingKey("testRK")
implicit val stringMessageEncoder =
Kleisli[F, AmqpMessage[String], AmqpMessage[Array[Byte]]](s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[F])

Expand All @@ -75,13 +78,13 @@ class AckerConsumerDemo[F[_]: Async](R: RabbitClient[F]) {
_ <- R.declareExchange(exchangeName, ExchangeType.Topic)
_ <- R.bindQueue(queueName, exchangeName, routingKey)
publisher <- R.createPublisherWithListener[AmqpMessage[String]](exchangeName,
routingKey,
publishingFlag,
publishingListener)
ackerConsumer <- R.createAckerConsumer[String](queueName)
routingKey,
publishingFlag,
publishingListener)
ackerConsumer <- R.createAckerConsumer[String](queueName)
(acker, consumer) = ackerConsumer
result = new Flow[F, String](consumer, acker, logPipe, publisher).flow
_ <- result.compile.drain
result = new Flow[F, String](consumer, acker, logPipe, publisher).flow
_ <- result.compile.drain
} yield ()
}
}
Expand Down Expand Up @@ -119,15 +122,17 @@ object IOAckerConsumer extends IOApp {
requestedHeartbeat = 60.seconds,
automaticRecovery = true,
automaticTopologyRecovery = true,
clientProvidedConnectionName = Some("app:rabbit")
maxInboundMessageBodySizeBytes = Fs2RabbitConfig.defaultMaxInboundMessageBodySizeBytes,
clientProvidedConnectionName = Some("app:rabbit"),
connectionFactory = None
)

override def run(args: List[String]): IO[ExitCode] =
RabbitClient.default[IO](config).resource.use { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
.as(ExitCode.Success)
}
RabbitClient.default[IO](config).resource.use { client =>
ResilientStream
.runF(new AckerConsumerDemo[IO](client).program)
.as(ExitCode.Success)
}

}
```

0 comments on commit 44982bb

Please sign in to comment.