From 0e0e75853c10dbfeef56a1c6787c1c21213f9a30 Mon Sep 17 00:00:00 2001 From: Jan Gerlinger Date: Wed, 16 Jan 2019 17:13:51 +0800 Subject: [PATCH] Provide an option to configure multiple nodes of a cluster --- .../fs2rabbit/config/Fs2RabbitConfig.scala | 38 +++++++++++++++++-- .../interpreter/ConnectionStream.scala | 25 +++++++----- .../fs2rabbit/interpreter/Fs2Rabbit.scala | 15 ++++---- .../fs2rabbit/examples/IOAckerConsumer.scala | 14 +++++-- .../examples/MonixAutoAckConsumer.scala | 14 +++++-- site/src/main/tut/config.md | 15 ++++++-- version.sbt | 2 +- 7 files changed, 91 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/com/github/gvolpe/fs2rabbit/config/Fs2RabbitConfig.scala b/core/src/main/scala/com/github/gvolpe/fs2rabbit/config/Fs2RabbitConfig.scala index 28bf9de9..cee8edcd 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2rabbit/config/Fs2RabbitConfig.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2rabbit/config/Fs2RabbitConfig.scala @@ -16,14 +16,46 @@ package com.github.gvolpe.fs2rabbit.config -case class Fs2RabbitConfig( +import cats.data.NonEmptyList + +case class Fs2RabbitNodeConfig( host: String, - port: Int, + port: Int +) + +case class Fs2RabbitConfig( + nodes: NonEmptyList[Fs2RabbitNodeConfig], virtualHost: String, connectionTimeout: Int, ssl: Boolean, username: Option[String], password: Option[String], requeueOnNack: Boolean, - internalQueueSize: Option[Int] + internalQueueSize: Option[Int], + automaticRecovery: Boolean ) + +object Fs2RabbitConfig { + def apply( + host: String, + port: Int, + virtualHost: String, + connectionTimeout: Int, + ssl: Boolean, + username: Option[String], + password: Option[String], + requeueOnNack: Boolean, + internalQueueSize: Option[Int], + automaticRecovery: Boolean = true + ): Fs2RabbitConfig = Fs2RabbitConfig( + nodes = NonEmptyList.one(Fs2RabbitNodeConfig(host, port)), + virtualHost = virtualHost, + connectionTimeout = connectionTimeout, + ssl = ssl, + username = username, + password = password, + requeueOnNack = requeueOnNack, + internalQueueSize = internalQueueSize, + automaticRecovery = automaticRecovery + ) +} diff --git a/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/ConnectionStream.scala b/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/ConnectionStream.scala index 2cba134c..a159b54d 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/ConnectionStream.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/ConnectionStream.scala @@ -17,7 +17,7 @@ package com.github.gvolpe.fs2rabbit.interpreter import javax.net.ssl.SSLContext - +import cats.data.NonEmptyList import cats.effect.Sync import cats.syntax.apply._ import cats.syntax.flatMap._ @@ -26,15 +26,19 @@ import com.github.gvolpe.fs2rabbit.algebra.Connection import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig import com.github.gvolpe.fs2rabbit.model.{AMQPChannel, RabbitChannel} import com.github.gvolpe.fs2rabbit.effects.Log -import com.rabbitmq.client.{ConnectionFactory, Connection => RabbitMQConnection} +import com.rabbitmq.client.{Address, ConnectionFactory, Connection => RabbitMQConnection} import fs2.Stream +import scala.collection.JavaConverters._ -class ConnectionStream[F[_]](factory: ConnectionFactory)(implicit F: Sync[F], L: Log[F]) +class ConnectionStream[F[_]]( + factory: ConnectionFactory, + addresses: NonEmptyList[Address] +)(implicit F: Sync[F], L: Log[F]) extends Connection[Stream[F, ?]] { private[fs2rabbit] val acquireConnection: F[(RabbitMQConnection, AMQPChannel)] = for { - conn <- F.delay(factory.newConnection) + conn <- F.delay(factory.newConnection(addresses.toList.asJava)) channel <- F.delay(conn.createChannel) } yield (conn, RabbitChannel(channel)) @@ -59,19 +63,22 @@ object ConnectionStream { private[fs2rabbit] def mkConnectionFactory[F[_]: Sync]( config: Fs2RabbitConfig, sslContext: Option[SSLContext] - ): F[ConnectionFactory] = + ): F[(ConnectionFactory, NonEmptyList[Address])] = Sync[F].delay { - val factory = new ConnectionFactory() - factory.setHost(config.host) - factory.setPort(config.port) + val factory = new ConnectionFactory() + val firstNode = config.nodes.head + factory.setHost(firstNode.host) + factory.setPort(firstNode.port) factory.setVirtualHost(config.virtualHost) factory.setConnectionTimeout(config.connectionTimeout) + factory.setAutomaticRecoveryEnabled(config.automaticRecovery) if (config.ssl) { sslContext.fold(factory.useSslProtocol())(factory.useSslProtocol) } config.username.foreach(factory.setUsername) config.password.foreach(factory.setPassword) - factory + val addresses = config.nodes.map(node => new Address(node.host, node.port)) + (factory, addresses) } } diff --git a/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/Fs2Rabbit.scala b/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/Fs2Rabbit.scala index ab220468..d5176543 100644 --- a/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/Fs2Rabbit.scala +++ b/core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/Fs2Rabbit.scala @@ -34,13 +34,14 @@ object Fs2Rabbit { config: Fs2RabbitConfig, sslContext: Option[SSLContext] = None ): F[Fs2Rabbit[F]] = - ConnectionStream.mkConnectionFactory[F](config, sslContext).map { factory => - val amqpClient = new AMQPClientStream[F] - val connStream = new ConnectionStream[F](factory) - val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) - val acker = new AckingProgram[F](config, amqpClient) - val consumer = new ConsumingProgram[F](amqpClient, internalQ) - new Fs2Rabbit[F](config, connStream, amqpClient, acker, consumer) + ConnectionStream.mkConnectionFactory[F](config, sslContext).map { + case (factory, addresses) => + val amqpClient = new AMQPClientStream[F] + val connStream = new ConnectionStream[F](factory, addresses) + val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) + val acker = new AckingProgram[F](config, amqpClient) + val consumer = new ConsumingProgram[F](amqpClient, internalQ) + new Fs2Rabbit[F](config, connStream, amqpClient, acker, consumer) } } // $COVERAGE-ON$ diff --git a/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/IOAckerConsumer.scala b/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/IOAckerConsumer.scala index 8cf2f1c7..b8a31069 100644 --- a/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/IOAckerConsumer.scala +++ b/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/IOAckerConsumer.scala @@ -16,9 +16,10 @@ package com.github.gvolpe.fs2rabbit.examples +import cats.data.NonEmptyList import cats.effect.{ExitCode, IO, IOApp} import cats.syntax.functor._ -import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig +import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit import com.github.gvolpe.fs2rabbit.resiliency.ResilientStream @@ -26,14 +27,19 @@ object IOAckerConsumer extends IOApp { private val config: Fs2RabbitConfig = Fs2RabbitConfig( virtualHost = "/", - host = "127.0.0.1", + nodes = NonEmptyList.one( + Fs2RabbitNodeConfig( + host = "127.0.0.1", + port = 5672 + ) + ), username = Some("guest"), password = Some("guest"), - port = 5672, ssl = false, connectionTimeout = 3, requeueOnNack = false, - internalQueueSize = Some(500) + internalQueueSize = Some(500), + automaticRecovery = true ) override def run(args: List[String]): IO[ExitCode] = diff --git a/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/MonixAutoAckConsumer.scala b/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/MonixAutoAckConsumer.scala index e53b4cf2..c7746055 100644 --- a/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/MonixAutoAckConsumer.scala +++ b/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples/MonixAutoAckConsumer.scala @@ -16,9 +16,10 @@ package com.github.gvolpe.fs2rabbit.examples +import cats.data.NonEmptyList import cats.effect.ExitCode import cats.syntax.functor._ -import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig +import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit import com.github.gvolpe.fs2rabbit.resiliency.ResilientStream import monix.eval.{Task, TaskApp} @@ -27,14 +28,19 @@ object MonixAutoAckConsumer extends TaskApp { private val config: Fs2RabbitConfig = Fs2RabbitConfig( virtualHost = "/", - host = "127.0.0.1", + nodes = NonEmptyList.one( + Fs2RabbitNodeConfig( + host = "127.0.0.1", + port = 5672 + ) + ), username = Some("guest"), password = Some("guest"), - port = 5672, ssl = false, connectionTimeout = 3, requeueOnNack = false, - internalQueueSize = Some(500) + internalQueueSize = Some(500), + automaticRecovery = true ) override def run(args: List[String]): Task[ExitCode] = diff --git a/site/src/main/tut/config.md b/site/src/main/tut/config.md index 96de5f13..801e1b1d 100644 --- a/site/src/main/tut/config.md +++ b/site/src/main/tut/config.md @@ -9,19 +9,26 @@ number: 1 The main `RabbitMQ` configuration should be defined as `Fs2RabbitConfig`. You choose how to get the information, either from an `application.conf` file, from the environment or provided by an external system. A popular option that fits well the tech stack is [Pure Config](https://pureconfig.github.io/). ```tut:book:silent -import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig +import cats.data.NonEmptyList +import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} val config = Fs2RabbitConfig( virtualHost = "/", - host = "127.0.0.1", + nodes = NonEmptyList.one( + Fs2RabbitNodeConfig( + host = "127.0.0.1", + port = 5672 + ) + ), username = Some("guest"), password = Some("guest"), - port = 5672, ssl = false, connectionTimeout = 3, requeueOnNack = false, - internalQueueSize = Some(500) + internalQueueSize = Some(500), + automaticRecovery = true ) ``` The `internalQueueSize` indicates the size of the fs2's bounded queue used internally to communicate with the AMQP Java driver. +The `automaticRecovery` indicates whether the AMQP Java driver should try to [recover broken connections](https://www.rabbitmq.com/api-guide.html#recovery). diff --git a/version.sbt b/version.sbt index 934906da..42ada6f6 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "1.0.0" +version in ThisBuild := "1.1.0-SNAPSHOT"