Skip to content

Commit

Permalink
Merge pull request #157 from JanGe/feature/config-multiple-nodes
Browse files Browse the repository at this point in the history
Provide an option to configure multiple nodes of a cluster
  • Loading branch information
gvolpe authored Jan 17, 2019
2 parents cbb8dc6 + 0e0e758 commit b09fa27
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,46 @@

package com.github.gvolpe.fs2rabbit.config

case class Fs2RabbitConfig(
import cats.data.NonEmptyList

case class Fs2RabbitNodeConfig(
host: String,
port: Int,
port: Int
)

case class Fs2RabbitConfig(
nodes: NonEmptyList[Fs2RabbitNodeConfig],
virtualHost: String,
connectionTimeout: Int,
ssl: Boolean,
username: Option[String],
password: Option[String],
requeueOnNack: Boolean,
internalQueueSize: Option[Int]
internalQueueSize: Option[Int],
automaticRecovery: Boolean
)

object Fs2RabbitConfig {
def apply(
host: String,
port: Int,
virtualHost: String,
connectionTimeout: Int,
ssl: Boolean,
username: Option[String],
password: Option[String],
requeueOnNack: Boolean,
internalQueueSize: Option[Int],
automaticRecovery: Boolean = true
): Fs2RabbitConfig = Fs2RabbitConfig(
nodes = NonEmptyList.one(Fs2RabbitNodeConfig(host, port)),
virtualHost = virtualHost,
connectionTimeout = connectionTimeout,
ssl = ssl,
username = username,
password = password,
requeueOnNack = requeueOnNack,
internalQueueSize = internalQueueSize,
automaticRecovery = automaticRecovery
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.github.gvolpe.fs2rabbit.interpreter

import javax.net.ssl.SSLContext

import cats.data.NonEmptyList
import cats.effect.Sync
import cats.syntax.apply._
import cats.syntax.flatMap._
Expand All @@ -26,15 +26,19 @@ import com.github.gvolpe.fs2rabbit.algebra.Connection
import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig
import com.github.gvolpe.fs2rabbit.model.{AMQPChannel, RabbitChannel}
import com.github.gvolpe.fs2rabbit.effects.Log
import com.rabbitmq.client.{ConnectionFactory, Connection => RabbitMQConnection}
import com.rabbitmq.client.{Address, ConnectionFactory, Connection => RabbitMQConnection}
import fs2.Stream
import scala.collection.JavaConverters._

class ConnectionStream[F[_]](factory: ConnectionFactory)(implicit F: Sync[F], L: Log[F])
class ConnectionStream[F[_]](
factory: ConnectionFactory,
addresses: NonEmptyList[Address]
)(implicit F: Sync[F], L: Log[F])
extends Connection[Stream[F, ?]] {

private[fs2rabbit] val acquireConnection: F[(RabbitMQConnection, AMQPChannel)] =
for {
conn <- F.delay(factory.newConnection)
conn <- F.delay(factory.newConnection(addresses.toList.asJava))
channel <- F.delay(conn.createChannel)
} yield (conn, RabbitChannel(channel))

Expand All @@ -59,19 +63,22 @@ object ConnectionStream {
private[fs2rabbit] def mkConnectionFactory[F[_]: Sync](
config: Fs2RabbitConfig,
sslContext: Option[SSLContext]
): F[ConnectionFactory] =
): F[(ConnectionFactory, NonEmptyList[Address])] =
Sync[F].delay {
val factory = new ConnectionFactory()
factory.setHost(config.host)
factory.setPort(config.port)
val factory = new ConnectionFactory()
val firstNode = config.nodes.head
factory.setHost(firstNode.host)
factory.setPort(firstNode.port)
factory.setVirtualHost(config.virtualHost)
factory.setConnectionTimeout(config.connectionTimeout)
factory.setAutomaticRecoveryEnabled(config.automaticRecovery)
if (config.ssl) {
sslContext.fold(factory.useSslProtocol())(factory.useSslProtocol)
}
config.username.foreach(factory.setUsername)
config.password.foreach(factory.setPassword)
factory
val addresses = config.nodes.map(node => new Address(node.host, node.port))
(factory, addresses)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ object Fs2Rabbit {
config: Fs2RabbitConfig,
sslContext: Option[SSLContext] = None
): F[Fs2Rabbit[F]] =
ConnectionStream.mkConnectionFactory[F](config, sslContext).map { factory =>
val amqpClient = new AMQPClientStream[F]
val connStream = new ConnectionStream[F](factory)
val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500))
val acker = new AckingProgram[F](config, amqpClient)
val consumer = new ConsumingProgram[F](amqpClient, internalQ)
new Fs2Rabbit[F](config, connStream, amqpClient, acker, consumer)
ConnectionStream.mkConnectionFactory[F](config, sslContext).map {
case (factory, addresses) =>
val amqpClient = new AMQPClientStream[F]
val connStream = new ConnectionStream[F](factory, addresses)
val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500))
val acker = new AckingProgram[F](config, amqpClient)
val consumer = new ConsumingProgram[F](amqpClient, internalQ)
new Fs2Rabbit[F](config, connStream, amqpClient, acker, consumer)
}
}
// $COVERAGE-ON$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@

package com.github.gvolpe.fs2rabbit.examples

import cats.data.NonEmptyList
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.functor._
import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig
import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit
import com.github.gvolpe.fs2rabbit.resiliency.ResilientStream

object IOAckerConsumer extends IOApp {

private val config: Fs2RabbitConfig = Fs2RabbitConfig(
virtualHost = "/",
host = "127.0.0.1",
nodes = NonEmptyList.one(
Fs2RabbitNodeConfig(
host = "127.0.0.1",
port = 5672
)
),
username = Some("guest"),
password = Some("guest"),
port = 5672,
ssl = false,
connectionTimeout = 3,
requeueOnNack = false,
internalQueueSize = Some(500)
internalQueueSize = Some(500),
automaticRecovery = true
)

override def run(args: List[String]): IO[ExitCode] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.github.gvolpe.fs2rabbit.examples

import cats.data.NonEmptyList
import cats.effect.ExitCode
import cats.syntax.functor._
import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig
import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit
import com.github.gvolpe.fs2rabbit.resiliency.ResilientStream
import monix.eval.{Task, TaskApp}
Expand All @@ -27,14 +28,19 @@ object MonixAutoAckConsumer extends TaskApp {

private val config: Fs2RabbitConfig = Fs2RabbitConfig(
virtualHost = "/",
host = "127.0.0.1",
nodes = NonEmptyList.one(
Fs2RabbitNodeConfig(
host = "127.0.0.1",
port = 5672
)
),
username = Some("guest"),
password = Some("guest"),
port = 5672,
ssl = false,
connectionTimeout = 3,
requeueOnNack = false,
internalQueueSize = Some(500)
internalQueueSize = Some(500),
automaticRecovery = true
)

override def run(args: List[String]): Task[ExitCode] =
Expand Down
15 changes: 11 additions & 4 deletions site/src/main/tut/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@ number: 1
The main `RabbitMQ` configuration should be defined as `Fs2RabbitConfig`. You choose how to get the information, either from an `application.conf` file, from the environment or provided by an external system. A popular option that fits well the tech stack is [Pure Config](https://pureconfig.github.io/).

```tut:book:silent
import com.github.gvolpe.fs2rabbit.config.Fs2RabbitConfig
import cats.data.NonEmptyList
import com.github.gvolpe.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
val config = Fs2RabbitConfig(
virtualHost = "/",
host = "127.0.0.1",
nodes = NonEmptyList.one(
Fs2RabbitNodeConfig(
host = "127.0.0.1",
port = 5672
)
),
username = Some("guest"),
password = Some("guest"),
port = 5672,
ssl = false,
connectionTimeout = 3,
requeueOnNack = false,
internalQueueSize = Some(500)
internalQueueSize = Some(500),
automaticRecovery = true
)
```

The `internalQueueSize` indicates the size of the fs2's bounded queue used internally to communicate with the AMQP Java driver.
The `automaticRecovery` indicates whether the AMQP Java driver should try to [recover broken connections](https://www.rabbitmq.com/api-guide.html#recovery).
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version in ThisBuild := "1.0.0"
version in ThisBuild := "1.1.0-SNAPSHOT"

0 comments on commit b09fa27

Please sign in to comment.