From e15691533ea8b6a06be21edfcaa9863e909127f0 Mon Sep 17 00:00:00 2001 From: Changlin Li Date: Wed, 10 Jul 2019 14:29:09 -0400 Subject: [PATCH] Add in missing AmqpHeaderVal classes (#213) * Add in missing AmqpHeaderVal classes --- build.sbt | 1 + .../fs2rabbit/effects/EnvelopeDecoder.scala | 12 +- .../interpreter/AMQPClientEffect.scala | 60 +++-- .../dev/profunktor/fs2rabbit/model.scala | 249 ++++++++++++++++-- .../fs2rabbit/AmqpFieldValueSpec.scala | 114 ++++++++ .../fs2rabbit/AmqpHeaderValSpec.scala | 38 --- .../fs2rabbit/AmqpPropertiesSpec.scala | 119 ++++++++- .../examples/AckerConsumerDemo.scala | 2 +- .../examples/AutoAckConsumerDemo.scala | 2 +- site/src/main/tut/examples/sample-acker.md | 2 +- site/src/main/tut/examples/sample-autoack.md | 2 +- 11 files changed, 498 insertions(+), 103 deletions(-) create mode 100644 core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala delete mode 100644 core/src/test/scala/dev/profunktor/fs2rabbit/AmqpHeaderValSpec.scala diff --git a/build.sbt b/build.sbt index b29ccbf9..3c90ab7b 100644 --- a/build.sbt +++ b/build.sbt @@ -39,6 +39,7 @@ val commonSettings = Seq( licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt")), homepage := Some(url("https://fs2-rabbit.profunktor.dev/")), headerLicense := Some(HeaderLicense.ALv2("2017-2019", "ProfunKtor")), + scalacOptions in (Compile, doc) ++= Seq("-no-link-warnings"), scalacOptions ++= determineVersionSpecificDeps(scalaVersion.value).scalacOptions, libraryDependencies ++= { val library = determineVersionSpecificDeps(scalaVersion.value) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala index 004cd20a..073faea0 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/effects/EnvelopeDecoder.scala @@ -17,8 +17,8 @@ package dev.profunktor.fs2rabbit.effects import cats.{Applicative, ApplicativeError} import cats.data.Kleisli -import dev.profunktor.fs2rabbit.model.{AmqpHeaderVal, AmqpProperties, ExchangeName, RoutingKey} -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal._ +import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, ExchangeName, RoutingKey} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue._ import cats.implicits._ object EnvelopeDecoder { @@ -39,10 +39,10 @@ object EnvelopeDecoder { def redelivered[F[_]: Applicative]: EnvelopeDecoder[F, Boolean] = Kleisli(e => e.redelivered.pure[F]) - def header[F[_]](name: String)(implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, AmqpHeaderVal] = + def header[F[_]](name: String)(implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, AmqpFieldValue] = Kleisli(e => F.catchNonFatal(e.properties.headers(name))) - def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpHeaderVal]] = + def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpFieldValue]] = Kleisli(_.properties.headers.get(name).pure[F]) def stringHeader[F[_]: ApplicativeError[?[_], Throwable]](name: String): EnvelopeDecoder[F, String] = @@ -70,13 +70,13 @@ object EnvelopeDecoder { name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] = optHeaderPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a } - private def headerPF[F[_], A](name: String)(pf: PartialFunction[AmqpHeaderVal, A])( + private def headerPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])( implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, A] = Kleisli { env => F.catchNonFatal(pf(env.properties.headers(name))) } - private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpHeaderVal, A])( + private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])( implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, Option[A]] = Kleisli(_.properties.headers.get(name).traverse(h => F.catchNonFatal(pf(h)))) } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala index 27e65b2f..4bc49ac9 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/AMQPClientEffect.scala @@ -17,10 +17,11 @@ package dev.profunktor.fs2rabbit.interpreter import cats.Applicative -import cats.effect.{Effect, Sync} import cats.effect.syntax.effect._ +import cats.effect.{Effect, Sync} import cats.syntax.flatMap._ import cats.syntax.functor._ +import com.rabbitmq.client._ import dev.profunktor.fs2rabbit.algebra.{AMQPClient, AMQPInternals} import dev.profunktor.fs2rabbit.arguments._ import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} @@ -28,14 +29,13 @@ import dev.profunktor.fs2rabbit.config.deletion import dev.profunktor.fs2rabbit.config.deletion.DeletionQueueConfig import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model._ -import com.rabbitmq.client._ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { private[fs2rabbit] def defaultConsumer[A]( channel: Channel, internals: AMQPInternals[F] - ): F[Consumer] = Applicative[F].pure { + ): F[Consumer] = Sync[F].delay { new DefaultConsumer(channel) { override def handleCancel(consumerTag: String): Unit = @@ -52,18 +52,48 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { properties: AMQP.BasicProperties, body: Array[Byte] ): Unit = { - val tag = envelope.getDeliveryTag - val routingKey = RoutingKey(envelope.getRoutingKey) - val exchange = ExchangeName(envelope.getExchange) - val redelivered = envelope.isRedeliver - val props = AmqpProperties.from(properties) - internals.queue.fold(()) { internalQ => - val envelope = AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered) - internalQ - .enqueue1(Right(envelope)) - .toIO - .unsafeRunAsync(_ => ()) + // This should not go wrong (if it does it is an indication of a bug in + // unsafeFrom)! + // However, I'm not entirely confident I've nailed down unsafeFrom ( + // since it requires a pretty intricate understanding of the underlying + // Java library) so just in case, we're wrapping it in a Try so that a + // bug here doesn't bring down our entire queue. + val amqpPropertiesOrErr = scala.util.Try(AmqpProperties.unsafeFrom(properties)) match { + // toEither is not supported by Scala 2.11 so we have a manual match + case scala.util.Success(amqpProperties) => Right(amqpProperties) + case scala.util.Failure(err) => + val rewrappedError = new Exception( + "You've stumbled across a bug in the interface between the underlying " + + "RabbitMQ Java library and fs2-rabbit! Please report this bug and " + + "include this stack trace and message.\nThe BasicProperties instance " + + s"that caused this error was:\n$properties\n", + err + ) + Left(rewrappedError) } + // Calling the Functor instance manually is because of three annoying things: + // 1. Scala 2.11 doesn't have right-biased Either so .map doesn't work, + // 2. Scala 2.13 deprecates .right so .right.map doesn't work either + // (since we have fatal warnings). + // 3. import cats.implicits._ doesn't work because it warns about an + // unused import for Scala 2.12 and Scala 2.13 + // So we invoke the Either Functor instance manually + import cats.instances.either.catsStdInstancesForEither + + val envelopeOrErr = catsStdInstancesForEither.map(amqpPropertiesOrErr) { props => + val tag = envelope.getDeliveryTag + val routingKey = RoutingKey(envelope.getRoutingKey) + val exchange = ExchangeName(envelope.getExchange) + val redelivered = envelope.isRedeliver + AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered) + } + + internals.queue + .fold(Applicative[F].pure(())) { internalQ => + internalQ.enqueue1(envelopeOrErr) + } + .toIO + .unsafeRunAsync(_ => ()) } } } @@ -168,7 +198,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] { ReplyText(replyText), ExchangeName(exchange), RoutingKey(routingKey), - AmqpProperties.from(properties), + AmqpProperties.unsafeFrom(properties), AmqpBody(body) ) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala index deb7150f..50b6c486 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/model.scala @@ -18,17 +18,19 @@ package dev.profunktor.fs2rabbit import java.nio.charset.Charset import java.nio.charset.StandardCharsets.UTF_8 +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Date -import cats.{Applicative, ApplicativeError} import cats.data.Kleisli import cats.implicits._ +import cats.{Applicative, ApplicativeError} +import com.rabbitmq.client.{AMQP, Channel, Connection, LongString} import dev.profunktor.fs2rabbit.arguments.Arguments import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal._ import dev.profunktor.fs2rabbit.javaConversion._ -import com.rabbitmq.client.impl.LongStringHelper -import com.rabbitmq.client.{AMQP, Channel, Connection, LongString} import fs2.Stream +import scodec.bits.ByteVector object model { @@ -81,27 +83,212 @@ object model { final case class NAck(deliveryTag: DeliveryTag) extends AckResult } - sealed trait AmqpHeaderVal extends Product with Serializable { - def impure: AnyRef = this match { - case StringVal(v) => LongStringHelper.asLongString(v) - case IntVal(v) => Int.box(v) - case LongVal(v) => Long.box(v) - case ArrayVal(v) => v.asJava - } + /** + * A string whose UTF-8 encoded representation is 255 bytes or less. + * + * Parts of the AMQP spec call for the use of such strings. + */ + sealed abstract case class ShortString private (str: String) + object ShortString { + val MaxByteLength = 255 + + def from(str: String): Option[ShortString] = + if (str.getBytes("utf-8").length <= MaxByteLength) { + Some(new ShortString(str) {}) + } else { + None + } + + /** + * This bypasses the safety check that [[from]] has. This is meant only for + * situations where you are certain the string cannot be larger than + * [[MaxByteLength]] (e.g. string literals). + */ + def unsafeFrom(str: String): ShortString = new ShortString(str) {} } - object AmqpHeaderVal { - final case class IntVal(value: Int) extends AmqpHeaderVal - final case class LongVal(value: Long) extends AmqpHeaderVal - final case class StringVal(value: String) extends AmqpHeaderVal - final case class ArrayVal(v: collection.Seq[Any]) extends AmqpHeaderVal + /** + * This hierarchy is meant to reflect the output of + * [[com.rabbitmq.client.impl.ValueReader.readFieldValue]] in a type-safe + * way. + * + * Note that we don't include LongString here because of some ambiguity in + * how RabbitMQ's Java client deals with it. While it will happily write out + * LongStrings and Strings separately, when reading it will always interpret + * a String as a LongString and so will never return a normal String. + * This means that if we included separate LongStringVal and StringVals we + * could have encode-decode round-trip differences (taking a String sending + * it off to RabbitMQ and reading it back will result in a LongString). + * We therefore collapse both LongStrings and Strings into a single StringVal + * backed by an ordinary String. + * + * Note that this type hierarchy is NOT exactly identical to the AMQP 0-9-1 + * spec. This is partially because RabbitMQ does not exactly follow the spec + * itself (see https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3) + * and also because the underlying Java client chooses to try to map the + * RabbitMQ types into Java primitive types when possible, collapsing a lot + * of the signed and unsigned types because Java does not have the signed + * and unsigned equivalents. + */ + sealed trait AmqpFieldValue extends Product with Serializable { + + /** + * The opposite of [[AmqpFieldValue.unsafeFrom]]. Turns an [[AmqpFieldValue]] + * into something that can be processed by + * [[com.rabbitmq.client.impl.ValueWriter]]. + */ + def toValueWriterCompatibleJava: AnyRef + } + + object AmqpFieldValue { + + /** + * A type for AMQP timestamps. + * + * Note that this is only accurate to the second (as supported by the AMQP + * spec and the underlying RabbitMQ implementation). + */ + sealed abstract case class TimestampVal private (instantWithOneSecondAccuracy: Instant) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: Date = Date.from(instantWithOneSecondAccuracy) + } + object TimestampVal { + def from(instant: Instant): TimestampVal = + new TimestampVal(instant.truncatedTo(ChronoUnit.SECONDS)) {} + + def from(date: Date): TimestampVal = from(date.toInstant) + } + + /** + * A type for precise decimal values. Note that while it is backed by a + * [[BigDecimal]] (just like the underlying Java library), there is a limit + * on the size and precision of the decimal: its unscaled representation cannot + * exceed 4 bytes due to the AMQP spec and its scale component must be an octet. + */ + sealed abstract case class DecimalVal private (sizeLimitedBigDecimal: BigDecimal) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.math.BigDecimal = sizeLimitedBigDecimal.bigDecimal + } + object DecimalVal { + val MaxUnscaledBits: Int = 32 + + val MaxScaleValue: Int = 255 + + /** + * The AMQP 0.9.1 standard specifies that the scale component of a + * decimal must be an octet (i.e. between 0 and 255) and that its + * unscaled component must be a 32-bit integer. If those criteria are + * not met, then we get back None. + */ + def from(bigDecimal: BigDecimal): Option[DecimalVal] = + if (getFullBitLengthOfUnscaled(bigDecimal) > MaxUnscaledBits || bigDecimal.scale > MaxScaleValue || bigDecimal.scale < 0) { + None + } else { + Some(new DecimalVal(bigDecimal) {}) + } + + /** + * Only use if you're certain that the [[BigDecimal]]'s representation + * meets the requirements of a [[ DecimalVal ]] (e.g. you are + * constructing one using literals). + * + * Almost always you should be using [[from]]. + */ + def unsafeFrom(bigDecimal: BigDecimal): DecimalVal = + new DecimalVal(bigDecimal) {} + + private def getFullBitLengthOfUnscaled(bigDecimal: BigDecimal): Int = + // Note that we add back 1 here because bitLength ignores the sign bit, + // reporting back an answer that's one bit too small. + bigDecimal.bigDecimal.unscaledValue.bitLength + 1 + + } + + final case class TableVal(value: Map[ShortString, AmqpFieldValue]) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.util.Map[String, AnyRef] = + value.map { case (key, v) => key.str -> v.toValueWriterCompatibleJava }.asJava + } + final case class ByteVal(value: Byte) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Byte = Byte.box(value) + } + final case class DoubleVal(value: Double) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Double = Double.box(value) + } + final case class FloatVal(value: Float) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Float = Float.box(value) + } + final case class ShortVal(value: Short) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Short = Short.box(value) + } + final case class ByteArrayVal(value: ByteVector) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: Array[Byte] = value.toArray + } + final case class BooleanVal(value: Boolean) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Boolean = Boolean.box(value) + } + final case class IntVal(value: Int) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Integer = Int.box(value) + } + final case class LongVal(value: Long) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.lang.Long = Long.box(value) + } + final case class StringVal(value: String) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: String = value + } + final case class ArrayVal(value: Vector[AmqpFieldValue]) extends AmqpFieldValue { + override def toValueWriterCompatibleJava: java.util.List[AnyRef] = value.map(_.toValueWriterCompatibleJava).asJava + } + case object NullVal extends AmqpFieldValue { + override def toValueWriterCompatibleJava: Null = null + } - def from(value: AnyRef): AmqpHeaderVal = value match { - case ls: LongString => StringVal(new String(ls.getBytes, "UTF-8")) - case s: String => StringVal(s) - case l: java.lang.Long => LongVal(l) - case i: java.lang.Integer => IntVal(i) - case a: java.util.List[_] => ArrayVal(a.asScala) + /** + * This method is meant purely to translate the output of + * [[com.rabbitmq.client.impl.ValueReader.readFieldValue]]. As such it is + * NOT total and will blow up if you pass it a class which + * [[com.rabbitmq.client.impl.ValueReader.readFieldValue]] does not output. + * + * As a user of this library, you almost certainly be constructing + * [[AmqpFieldValue]]s directly instead of using this method. + */ + private[fs2rabbit] def unsafeFrom(value: AnyRef): AmqpFieldValue = value match { + // It's safe to call unsafeFromBigDecimal here because if the value came + // from readFieldValue, we're assured that the check on BigDecimal + // representation size must have already occurred because ValueReader will + // only read a maximum of 4 bytes before bailing out (similarly it will + // read no more than the first 8 bits to determine scale). + case bd: java.math.BigDecimal => DecimalVal.unsafeFrom(bd) + case d: java.util.Date => TimestampVal.from(d) + // Looking at com.rabbitmq.client.impl.ValueReader.readFieldValue reveals + // that java.util.Maps must always be created by + // com.rabbitmq.client.impl.ValueReader.readTable, whose Maps must always + // be of this type, even if at runtime type erasure removes the inner types. + // This makes us safe from ClassCastExceptions down the road. + case t: java.util.Map[String @unchecked, AnyRef @unchecked] => + // ShortString.unsafeOf is safe to use here for a rather subtle reason. + // Even though ValueReader.readShortstr doesn't perform any explicit + // validation that a short string is 255 chars or less, it only reads + // one byte to determine how large of a byte array to allocate for the + // string which means the length cannot possibly exceed 255. + TableVal(t.asScala.toMap.map { case (key, v) => ShortString.unsafeFrom(key) -> unsafeFrom(v) }) + case byte: java.lang.Byte => ByteVal(byte) + case double: java.lang.Double => DoubleVal(double) + case float: java.lang.Float => FloatVal(float) + case short: java.lang.Short => ShortVal(short) + case byteArray: Array[Byte] => ByteArrayVal(ByteVector(byteArray)) + case b: java.lang.Boolean => BooleanVal(b) + case i: java.lang.Integer => IntVal(i) + case l: java.lang.Long => LongVal(l) + case s: java.lang.String => StringVal(s) + case ls: LongString => StringVal(ls.toString) + // Looking at com.rabbitmq.client.impl.ValueReader.readFieldValue reveals + // that java.util.Lists must always be created by + // com.rabbitmq.client.impl.ValueReader.readArray, whose values must are + // then recursively created by + // com.rabbitmq.client.impl.ValueReader.readFieldValue, which indicates + // that the inner type can never be anything other than the types + // represented by AmqpHeaderVal + // This makes us safe from ClassCastExceptions down the road. + case a: java.util.List[AnyRef @unchecked] => ArrayVal(a.asScala.toVector.map(unsafeFrom)) + case null => NullVal } } @@ -118,13 +305,23 @@ object model { expiration: Option[String] = None, replyTo: Option[String] = None, clusterId: Option[String] = None, - headers: Map[String, AmqpHeaderVal] = Map.empty + headers: Map[String, AmqpFieldValue] = Map.empty ) object AmqpProperties { def empty = AmqpProperties() - def from(basicProps: AMQP.BasicProperties): AmqpProperties = + /** + * It is possible to construct an [[AMQP.BasicProperties]] that will cause + * this method to crash, hence it is unsafe. It is meant to be passed + * values that are created by the underlying RabbitMQ Java client library + * or other values that you are certain are well-formed (that is they + * conform to the AMQP spec). + * + * A user of the library should most likely not be calling this directly, and instead should + * be constructing an [[AmqpProperties]] directly. + */ + private[fs2rabbit] def unsafeFrom(basicProps: AMQP.BasicProperties): AmqpProperties = AmqpProperties( contentType = Option(basicProps.getContentType), contentEncoding = Option(basicProps.getContentEncoding), @@ -141,7 +338,7 @@ object model { headers = Option(basicProps.getHeaders) .fold(Map.empty[String, Object])(_.asScala.toMap) .map { - case (k, v) => k -> AmqpHeaderVal.from(v) + case (k, v) => k -> AmqpFieldValue.unsafeFrom(v) } ) @@ -162,7 +359,7 @@ object model { .clusterId(props.clusterId.orNull) // Note we don't use mapValues here to maintain compatibility between // Scala 2.12 and 2.13 - .headers(props.headers.map { case (key, value) => (key, value.impure) }.asJava) + .headers(props.headers.map { case (key, value) => (key, value.toValueWriterCompatibleJava) }.asJava) .build() } } diff --git a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala new file mode 100644 index 00000000..6c0adb36 --- /dev/null +++ b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpFieldValueSpec.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2017-2019 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.fs2rabbit + +import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStream} +import java.time.Instant + +import com.rabbitmq.client.impl.{ValueReader, ValueWriter} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue._ +import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, ShortString} +import org.scalatest.{Assertion, FlatSpecLike, Matchers} + +class AmqpFieldValueSpec extends FlatSpecLike with Matchers with AmqpPropertiesArbitraries { + + it should "convert from and to Java primitive header values" in { + val intVal = IntVal(1) + val longVal = LongVal(2L) + val stringVal = StringVal("hey") + val arrayVal = ArrayVal(Vector(IntVal(3), IntVal(2), IntVal(1))) + + AmqpFieldValue.unsafeFrom(intVal.toValueWriterCompatibleJava) should be(intVal) + AmqpFieldValue.unsafeFrom(longVal.toValueWriterCompatibleJava) should be(longVal) + AmqpFieldValue.unsafeFrom(stringVal.toValueWriterCompatibleJava) should be(stringVal) + AmqpFieldValue.unsafeFrom("fs2") should be(StringVal("fs2")) + AmqpFieldValue.unsafeFrom(arrayVal.toValueWriterCompatibleJava) should be(arrayVal) + } + it should "preserve the same value after a round-trip through impure and from" in { + forAll { amqpHeaderVal: AmqpFieldValue => + AmqpFieldValue.unsafeFrom(amqpHeaderVal.toValueWriterCompatibleJava) == amqpHeaderVal + } + } + + it should "preserve the same values after a round-trip through the Java ValueReader and ValueWriter" in { + forAll(assertThatValueIsPreservedThroughJavaWriteAndRead _) + } + + it should "preserve a specific StringVal that previously failed after a round-trip through the Java ValueReader and ValueWriter" in { + assertThatValueIsPreservedThroughJavaWriteAndRead(StringVal("kyvmqzlbjivLqQFukljghxdowkcmjklgSeybdy")) + } + + it should "preserve a specific DateVal created from an Instant that has millisecond accuracy after a round-trip through the Java ValueReader and ValueWriter" in { + val instant = Instant.parse("4000-11-03T20:17:29.57Z") + val myDateVal = TimestampVal.from(instant) + assertThatValueIsPreservedThroughJavaWriteAndRead(myDateVal) + } + + "DecimalVal" should "reject a BigDecimal of an unscaled value with 33 bits..." in { + DecimalVal.from(BigDecimal(Int.MaxValue) + BigDecimal(1)) should be(None) + } + it should "reject a BigDecimal with a scale over octet size" in { + DecimalVal.from(new java.math.BigDecimal(java.math.BigInteger.valueOf(12345L), 1000)) should be(None) + } + + // We need to wrap things in a dummy table because the method that would be + // great to test with ValueReader, readFieldValue, is private, and so we + // have to call the next best thing, readTable. + private def wrapInDummyTable(value: AmqpFieldValue): TableVal = + TableVal(Map(ShortString.unsafeFrom("dummyKey") -> value)) + + private def createWriterFromQueue(outputResults: collection.mutable.Queue[Byte]): ValueWriter = + new ValueWriter({ + new DataOutputStream({ + new OutputStream { + override def write(b: Int): Unit = + outputResults.enqueue(b.toByte) + } + }) + }) + + private def createReaderFromQueue(input: collection.mutable.Queue[Byte]): ValueReader = { + val inputStream = new InputStream { + override def read(): Int = + try { + val result = input.dequeue() + // A signed -> unsigned conversion because bytes by default are + // converted into signed ints, which is bad when the API of read + // states that negative numbers indicate EOF... + 0Xff & result.toInt + } catch { + case _: NoSuchElementException => -1 + } + + override def available(): Int = { + val result = input.size + result + } + } + new ValueReader(new DataInputStream(inputStream)) + } + + private def assertThatValueIsPreservedThroughJavaWriteAndRead(amqpHeaderVal: AmqpFieldValue): Assertion = { + val outputResultsAsTable = collection.mutable.Queue.empty[Byte] + val tableWriter = createWriterFromQueue(outputResultsAsTable) + tableWriter.writeTable(wrapInDummyTable(amqpHeaderVal).toValueWriterCompatibleJava) + + val reader = createReaderFromQueue(outputResultsAsTable) + val readValue = reader.readTable() + AmqpFieldValue.unsafeFrom(readValue) should be(wrapInDummyTable(amqpHeaderVal)) + } +} diff --git a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpHeaderValSpec.scala b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpHeaderValSpec.scala deleted file mode 100644 index af1a099d..00000000 --- a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpHeaderValSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2017-2019 ProfunKtor - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.profunktor.fs2rabbit - -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal._ -import org.scalatest.{FlatSpecLike, Matchers} - -class AmqpHeaderValSpec extends FlatSpecLike with Matchers { - - it should "convert from and to Java primitive header values" in { - val intVal = IntVal(1) - val longVal = LongVal(2L) - val stringVal = StringVal("hey") - val arrayVal = ArrayVal(Seq(3, 2, 1)) - - AmqpHeaderVal.from(intVal.impure) should be(intVal) - AmqpHeaderVal.from(longVal.impure) should be(longVal) - AmqpHeaderVal.from(stringVal.impure) should be(stringVal) - AmqpHeaderVal.from("fs2") should be(StringVal("fs2")) - AmqpHeaderVal.from(arrayVal.impure) should be(ArrayVal(Seq(3, 2, 1))) - } - -} diff --git a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala index a090a628..2bf6cbd7 100644 --- a/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala +++ b/core/src/test/scala/dev/profunktor/fs2rabbit/AmqpPropertiesSpec.scala @@ -16,20 +16,23 @@ package dev.profunktor.fs2rabbit -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal._ -import dev.profunktor.fs2rabbit.model.{AmqpHeaderVal, AmqpProperties, DeliveryMode} -import org.scalacheck._ +import java.util.Date + +import com.rabbitmq.client.AMQP +import dev.profunktor.fs2rabbit.model.AmqpFieldValue._ +import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, DeliveryMode, ShortString} import org.scalacheck.Arbitrary.arbitrary +import org.scalacheck._ import org.scalatest.{FlatSpecLike, Matchers} -import com.rabbitmq.client.AMQP import org.scalatestplus.scalacheck.{ScalaCheckPropertyChecks => PropertyChecks} +import scodec.bits.ByteVector class AmqpPropertiesSpec extends FlatSpecLike with Matchers with AmqpPropertiesArbitraries { - forAll { amqpProperties: AmqpProperties => - it should s"convert from and to Java AMQP.BasicProperties for $amqpProperties" in { + it should s"convert from and to Java AMQP.BasicProperties" in { + forAll { amqpProperties: AmqpProperties => val basicProps = amqpProperties.asBasicProps - AmqpProperties.from(basicProps) should be(amqpProperties) + AmqpProperties.unsafeFrom(basicProps) should be(amqpProperties) } } @@ -47,18 +50,69 @@ class AmqpPropertiesSpec extends FlatSpecLike with Matchers with AmqpPropertiesA None, None, None, - Map.empty[String, AmqpHeaderVal])) + Map.empty[String, AmqpFieldValue])) } it should "handle null values in Java AMQP.BasicProperties" in { val basic = new AMQP.BasicProperties() - AmqpProperties.from(basic) should be(AmqpProperties.empty) + AmqpProperties.unsafeFrom(basic) should be(AmqpProperties.empty) } } trait AmqpPropertiesArbitraries extends PropertyChecks { + implicit val bigDecimalVal: Arbitrary[DecimalVal] = Arbitrary[DecimalVal] { + for { + unscaledValue <- arbitrary[Int] + scale <- Gen.choose(0, 255) + } yield { + val javaBigDecimal = new java.math.BigDecimal(BigInt(unscaledValue).bigInteger, scale) + DecimalVal.unsafeFrom(BigDecimal(javaBigDecimal)) + } + } + + implicit val dateVal: Arbitrary[TimestampVal] = Arbitrary[TimestampVal] { + arbitrary[Date].map(TimestampVal.from) + } + + private def modTruncateString(str: String): ShortString = { + val newLength = str.length % (ShortString.MaxByteLength + 1) + ShortString.unsafeFrom(str.substring(newLength)) + } + + def tableVal(maxDepth: Int): Arbitrary[TableVal] = Arbitrary { + for { + keys <- arbitrary[List[String]] + keysWithValueGen = keys.map(key => amqpHeaderVal(maxDepth).arbitrary.map(modTruncateString(key) -> _)) + keyValues <- Gen.sequence[List[(ShortString, AmqpFieldValue)], (ShortString, AmqpFieldValue)](keysWithValueGen) + } yield TableVal(keyValues.toMap) + } + + implicit val byteVal: Arbitrary[ByteVal] = Arbitrary { + arbitrary[Byte].map(ByteVal.apply) + } + + implicit val doubleVal: Arbitrary[DoubleVal] = Arbitrary { + arbitrary[Double].map(DoubleVal.apply) + } + + implicit val floatVal: Arbitrary[FloatVal] = Arbitrary { + arbitrary[Float].map(FloatVal.apply) + } + + implicit val shortVal: Arbitrary[ShortVal] = Arbitrary { + arbitrary[Short].map(ShortVal.apply) + } + + implicit val byteArrayVal: Arbitrary[ByteArrayVal] = Arbitrary { + arbitrary[Array[Byte]].map(xs => ByteArrayVal(ByteVector(xs))) + } + + implicit val booleanVal: Arbitrary[BooleanVal] = Arbitrary { + arbitrary[Boolean].map(BooleanVal.apply) + } + implicit val intVal: Arbitrary[IntVal] = Arbitrary[IntVal] { Gen.posNum[Int].flatMap(x => IntVal(x)) } @@ -71,13 +125,50 @@ trait AmqpPropertiesArbitraries extends PropertyChecks { Gen.alphaStr.flatMap(x => StringVal(x)) } - implicit val amqpHeaderVal: Arbitrary[AmqpHeaderVal] = Arbitrary[AmqpHeaderVal] { - Gen.oneOf(arbitrary[IntVal], arbitrary[LongVal], arbitrary[StringVal]) + def arrayVal(maxDepth: Int): Arbitrary[ArrayVal] = Arbitrary { + implicit val implicitAmqpHeaderVal: Arbitrary[AmqpFieldValue] = amqpHeaderVal(maxDepth) + arbitrary[Vector[AmqpFieldValue]].map(ArrayVal.apply) + } + + implicit val nullVal: Arbitrary[NullVal.type] = Arbitrary { + Gen.const(NullVal) + } + + implicit val implicitAmqpHeaderVal: Arbitrary[AmqpFieldValue] = Arbitrary { + // Cap it at 2 so that we don't have Stack Overflows/long test times + amqpHeaderVal(2).arbitrary + } + + def amqpHeaderVal(maxDepth: Int): Arbitrary[AmqpFieldValue] = Arbitrary[AmqpFieldValue] { + val nonRecursiveGenerators = List( + bigDecimalVal.arbitrary, + dateVal.arbitrary, + byteVal.arbitrary, + doubleVal.arbitrary, + floatVal.arbitrary, + shortVal.arbitrary, + byteArrayVal.arbitrary, + booleanVal.arbitrary, + intVal.arbitrary, + longVal.arbitrary, + stringVal.arbitrary, + nullVal.arbitrary + ) + + if (maxDepth <= 0) { + // This is because Gen.oneOf is overloaded and we need to access its three-argument version + Gen.oneOf(nonRecursiveGenerators(0), nonRecursiveGenerators(1), nonRecursiveGenerators.drop(2): _*) + } else { + val allGenerators = tableVal(maxDepth - 1).arbitrary :: arrayVal(maxDepth - 1).arbitrary :: nonRecursiveGenerators + Gen.lzy( + Gen.oneOf(allGenerators(0), allGenerators(1), allGenerators.drop(2): _*) + ) + } } - private val headersGen: Gen[(String, AmqpHeaderVal)] = for { + private val headersGen: Gen[(String, AmqpFieldValue)] = for { key <- Gen.alphaStr - value <- arbitrary[AmqpHeaderVal] + value <- arbitrary[AmqpFieldValue] } yield (key, value) implicit val amqpProperties: Arbitrary[AmqpProperties] = Arbitrary[AmqpProperties] { @@ -94,7 +185,7 @@ trait AmqpPropertiesArbitraries extends PropertyChecks { expiration <- Gen.option(Gen.alphaNumStr) replyTo <- Gen.option(Gen.alphaNumStr) clusterId <- Gen.option(Gen.alphaNumStr) - headers <- Gen.mapOf[String, AmqpHeaderVal](headersGen) + headers <- Gen.mapOf[String, AmqpFieldValue](headersGen) } yield AmqpProperties( contentType, diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala index 38b2dc0f..bdae42ca 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AckerConsumerDemo.scala @@ -25,7 +25,7 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal.{LongVal, StringVal} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala index d74d8e9f..5266fa7a 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/AutoAckConsumerDemo.scala @@ -25,7 +25,7 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal.{LongVal, StringVal} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} import io.circe.Encoder diff --git a/site/src/main/tut/examples/sample-acker.md b/site/src/main/tut/examples/sample-acker.md index 82f64750..4fddb338 100644 --- a/site/src/main/tut/examples/sample-acker.md +++ b/site/src/main/tut/examples/sample-acker.md @@ -18,7 +18,7 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal.{LongVal, StringVal} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream} diff --git a/site/src/main/tut/examples/sample-autoack.md b/site/src/main/tut/examples/sample-autoack.md index eefc3778..9f26b3bc 100644 --- a/site/src/main/tut/examples/sample-autoack.md +++ b/site/src/main/tut/examples/sample-autoack.md @@ -18,7 +18,7 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.interpreter.Fs2Rabbit import dev.profunktor.fs2rabbit.json.Fs2JsonEncoder import dev.profunktor.fs2rabbit.model.AckResult.Ack -import dev.profunktor.fs2rabbit.model.AmqpHeaderVal.{LongVal, StringVal} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{LongVal, StringVal} import dev.profunktor.fs2rabbit.model._ import fs2.{Pipe, Pure, Stream}