diff --git a/avro/src/main/scala/magnolify/avro/AvroType.scala b/avro/src/main/scala/magnolify/avro/AvroType.scala index a36b93299..e63ed1e65 100644 --- a/avro/src/main/scala/magnolify/avro/AvroType.scala +++ b/avro/src/main/scala/magnolify/avro/AvroType.scala @@ -32,10 +32,6 @@ import scala.reflect.ClassTag import scala.jdk.CollectionConverters._ import scala.collection.compat._ -class doc(doc: String) extends StaticAnnotation with Serializable { - override def toString: String = doc -} - sealed trait AvroType[T] extends Converter[T, GenericRecord, GenericRecord] { val schema: Schema def apply(r: GenericRecord): T = from(r) diff --git a/avro/src/main/scala/magnolify/avro/package.scala b/avro/src/main/scala/magnolify/avro/package.scala new file mode 100644 index 000000000..31f218fe7 --- /dev/null +++ b/avro/src/main/scala/magnolify/avro/package.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify + +package object avro { + type doc = shared.doc +} diff --git a/build.sbt b/build.sbt index ba2140f8a..caf854c7e 100644 --- a/build.sbt +++ b/build.sbt @@ -189,6 +189,12 @@ val commonSettings = Seq( name = "Shameera Rathnayaka Yodage", email = "shameerayodage@gmail.com", url = url("https://twitter.com/syodage") + ), + Developer( + id = "shnapz", + name = "Andrew Kabas", + email = "akabas@spotify.com", + url = url("https://github.com/shnapz") ) ) ) diff --git a/docs/parquet.md b/docs/parquet.md index 7948c2994..5a79b4c40 100644 --- a/docs/parquet.md +++ b/docs/parquet.md @@ -48,3 +48,14 @@ implicit val pfDecimalBinary = ParquetField.decimalBinary(20, 0) Among the date/time types, `DATE` maps to `java.time.LocalDate`. The other types, `TIME` and `TIMESTAMP`, map to `OffsetTime`/`LocalTime` and `Instant`/`LocalDateTime` with `isAdjustedToUTC` set to `true`/`false`. They can be in nano, micro, or milliseconds precision with `import magnolify.parquet.logical.{nanos,micros,millis}._`. Note that Parquet's official Avro support maps `REPEATED` fields to an `array` field inside a nested group. Use `import magnolify.parquet.ParquetArray.AvroCompat._` to ensure compatibility with Avro. + +The top level class and all fields (including nested class fields) can be annotated with `@doc` annotation. Note that nested classes annotations are ignored. + +```scala +@doc("This is ignored") +case class NestedClass(@doc("nested field annotation") i: Int) + +@doc("Top level annotation") +case class TopLevelType(@doc("field annotation") pd: NestedClass, @doc("field annotation 2") i: +Integers) +``` \ No newline at end of file diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetField.scala b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala new file mode 100644 index 000000000..ca23b94aa --- /dev/null +++ b/parquet/src/main/scala/magnolify/parquet/ParquetField.scala @@ -0,0 +1,470 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet + +import magnolia1._ +import magnolify.shared.{Converter => _, _} +import magnolify.shims.FactoryCompat + +import java.nio.{ByteBuffer, ByteOrder} +import java.time.LocalDate +import java.util.UUID +import org.apache.parquet.io.ParquetDecodingException +import org.apache.parquet.io.api._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.{LogicalTypeAnnotation, Type, Types} + +import scala.annotation.{implicitNotFound, nowarn} +import scala.collection.concurrent +import scala.language.experimental.macros +import scala.collection.compat._ + +sealed trait ParquetField[T] extends Serializable { + + @transient private lazy val schemaCache: concurrent.Map[UUID, Type] = + concurrent.TrieMap.empty + + protected def buildSchema(cm: CaseMapper): Type + def schema(cm: CaseMapper): Type = + schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm)) + + val hasAvroArray: Boolean = false + val fieldDocs: Map[String, String] + val typeDoc: Option[String] + protected val isGroup: Boolean = false + protected def isEmpty(v: T): Boolean + def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit + def newConverter: TypeConverter[T] + + protected def writeGroup(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { + if (isGroup) { + c.startGroup() + } + write(c, v)(cm) + if (isGroup) { + c.endGroup() + } + } +} + +object ParquetField { + sealed trait Record[T] extends ParquetField[T] { + override protected val isGroup: Boolean = true + + override protected def isEmpty(v: T): Boolean = false + } + + // //////////////////////////////////////////// + type Typeclass[T] = ParquetField[T] + + def join[T](caseClass: CaseClass[Typeclass, T]): ParquetField[T] = { + if (caseClass.isValueClass) { + val p = caseClass.parameters.head + val tc = p.typeclass + new ParquetField[T] { + override protected def buildSchema(cm: CaseMapper): Type = tc.buildSchema(cm) + override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v)) + override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = + tc.writeGroup(c, p.dereference(v))(cm) + override def newConverter: TypeConverter[T] = { + val buffered = tc.newConverter + .asInstanceOf[TypeConverter.Buffered[p.PType]] + new TypeConverter.Delegate[p.PType, T](buffered) { + override def get: T = inner.get(b => caseClass.construct(_ => b.head)) + } + } + override val fieldDocs: Map[String, String] = Map.empty + override val typeDoc: Option[String] = None + } + } else { + new Record[T] { + override def buildSchema(cm: CaseMapper): Type = + caseClass.parameters + .foldLeft(Types.requiredGroup()) { (g, p) => + g.addField(Schema.rename(p.typeclass.schema(cm), cm.map(p.label))) + } + .named(caseClass.typeName.full) + + override val hasAvroArray: Boolean = caseClass.parameters.exists(_.typeclass.hasAvroArray) + + override val fieldDocs: Map[String, String] = + caseClass.parameters.flatMap { param => + val label = param.label + val nestedDocs = param.typeclass.fieldDocs.map { case (k, v) => + s"$label.$k" -> v + } + + val collectedAnnValue = getDoc( + param.annotations, + s"Field ${caseClass.typeName}.$label" + ) + val joinedAnnotations = collectedAnnValue match { + case Some(value) => nestedDocs + (label -> value) + case None => nestedDocs + } + joinedAnnotations + }.toMap + + override val typeDoc: Option[String] = getDoc( + caseClass.annotations, + s"Type ${caseClass.typeName}" + ) + + override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { + caseClass.parameters.foreach { p => + val x = p.dereference(v) + if (!p.typeclass.isEmpty(x)) { + val name = cm.map(p.label) + c.startField(name, p.index) + p.typeclass.writeGroup(c, x)(cm) + c.endField(name, p.index) + } + } + } + + override def newConverter: TypeConverter[T] = + new GroupConverter with TypeConverter.Buffered[T] { + private val fieldConverters = caseClass.parameters.map(_.typeclass.newConverter) + + override def isPrimitive: Boolean = false + + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) + + override def start(): Unit = () + + override def end(): Unit = { + val value = caseClass.construct { p => + try { + fieldConverters(p.index).get + } catch { + case e: IllegalArgumentException => + val field = s"${caseClass.typeName.full}#${p.label}" + throw new ParquetDecodingException( + s"Failed to decode $field: ${e.getMessage}", + e + ) + } + } + addValue(value) + } + } + } + } + + } + + @implicitNotFound("Cannot derive ParquetType for sealed trait") + private sealed trait Dispatchable[T] + def split[T: Dispatchable](sealedTrait: SealedTrait[Typeclass, T]): ParquetField[T] = ??? + + implicit def apply[T]: ParquetField[T] = macro Magnolia.gen[T] + + private def getDoc(annotations: Seq[Any], name: String): Option[String] = { + val docs = annotations.collect { case d: magnolify.shared.doc => d.toString } + require(docs.size <= 1, s"More than one @doc annotation: $name") + docs.headOption + } + + // //////////////////////////////////////////////// + + def from[T]: FromWord[T] = new FromWord[T] + + class FromWord[T] { + def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = + new Primitive[U] { + override def buildSchema(cm: CaseMapper): Type = pf.schema(cm) + override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) + override def newConverter: TypeConverter[U] = + pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) + override type ParquetT = pf.ParquetT + } + } + + // //////////////////////////////////////////////// + + sealed trait Primitive[T] extends ParquetField[T] { + override protected def isEmpty(v: T): Boolean = false + override val fieldDocs: Map[String, String] = Map.empty + override val typeDoc: Option[String] = None + type ParquetT <: Comparable[ParquetT] + } + + def primitive[T, UnderlyingT <: Comparable[UnderlyingT]]( + f: RecordConsumer => T => Unit, + g: => TypeConverter[T], + ptn: PrimitiveTypeName, + lta: => LogicalTypeAnnotation = null + ): Primitive[T] = + new Primitive[T] { + override def buildSchema(cm: CaseMapper): Type = Schema.primitive(ptn, lta) + override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = f(c)(v) + override def newConverter: TypeConverter[T] = g + override type ParquetT = UnderlyingT + } + + implicit val pfBoolean = + primitive[Boolean, java.lang.Boolean]( + _.addBoolean, + TypeConverter.newBoolean, + PrimitiveTypeName.BOOLEAN + ) + + implicit val pfByte = + primitive[Byte, Integer]( + c => v => c.addInteger(v), + TypeConverter.newInt.map(_.toByte), + PrimitiveTypeName.INT32, + LogicalTypeAnnotation.intType(8, true) + ) + implicit val pfShort = + primitive[Short, Integer]( + c => v => c.addInteger(v), + TypeConverter.newInt.map(_.toShort), + PrimitiveTypeName.INT32, + LogicalTypeAnnotation.intType(16, true) + ) + implicit val pfInt = + primitive[Int, Integer]( + _.addInteger, + TypeConverter.newInt, + PrimitiveTypeName.INT32, + LogicalTypeAnnotation.intType(32, true) + ) + implicit val pfLong = + primitive[Long, java.lang.Long]( + _.addLong, + TypeConverter.newLong, + PrimitiveTypeName.INT64, + LogicalTypeAnnotation.intType(64, true) + ) + implicit val pfFloat = + primitive[Float, java.lang.Float](_.addFloat, TypeConverter.newFloat, PrimitiveTypeName.FLOAT) + + implicit val pfDouble = + primitive[Double, java.lang.Double]( + _.addDouble, + TypeConverter.newDouble, + PrimitiveTypeName.DOUBLE + ) + + implicit val pfByteArray = + primitive[Array[Byte], Binary]( + c => v => c.addBinary(Binary.fromConstantByteArray(v)), + TypeConverter.newByteArray, + PrimitiveTypeName.BINARY + ) + implicit val pfString = + primitive[String, Binary]( + c => v => c.addBinary(Binary.fromString(v)), + TypeConverter.newString, + PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.stringType() + ) + + implicit def pfOption[T](implicit t: ParquetField[T]): ParquetField[Option[T]] = + new ParquetField[Option[T]] { + override def buildSchema(cm: CaseMapper): Type = + Schema.setRepetition(t.schema(cm), Repetition.OPTIONAL) + override protected def isEmpty(v: Option[T]): Boolean = v.isEmpty + + override val fieldDocs: Map[String, String] = t.fieldDocs + + override val typeDoc: Option[String] = None + + override def write(c: RecordConsumer, v: Option[T])(cm: CaseMapper): Unit = + v.foreach(t.writeGroup(c, _)(cm)) + + override def newConverter: TypeConverter[Option[T]] = { + val buffered = t.newConverter + .asInstanceOf[TypeConverter.Buffered[T]] + .withRepetition(Repetition.OPTIONAL) + new TypeConverter.Delegate[T, Option[T]](buffered) { + override def get: Option[T] = inner.get(_.headOption) + } + } + } + + private val AvroArrayField = "array" + implicit def ptIterable[T, C[T]](implicit + t: ParquetField[T], + ti: C[T] => Iterable[T], + fc: FactoryCompat[T, C[T]], + pa: ParquetArray + ): ParquetField[C[T]] = { + new ParquetField[C[T]] { + override val hasAvroArray: Boolean = pa match { + case ParquetArray.default => false + case ParquetArray.AvroCompat.avroCompat => true + } + + override def buildSchema(cm: CaseMapper): Type = { + val repeatedSchema = Schema.setRepetition(t.schema(cm), Repetition.REPEATED) + if (hasAvroArray) { + Types + .requiredGroup() + .addField(Schema.rename(repeatedSchema, AvroArrayField)) + .as(LogicalTypeAnnotation.listType()) + .named(t.schema(cm).getName) + } else { + repeatedSchema + } + } + + override protected val isGroup: Boolean = hasAvroArray + override protected def isEmpty(v: C[T]): Boolean = v.isEmpty + + override def write(c: RecordConsumer, v: C[T])(cm: CaseMapper): Unit = + if (hasAvroArray) { + c.startField(AvroArrayField, 0) + v.foreach(t.writeGroup(c, _)(cm)) + c.endField(AvroArrayField, 0) + } else { + v.foreach(t.writeGroup(c, _)(cm)) + } + + override def newConverter: TypeConverter[C[T]] = { + val buffered = t.newConverter + .asInstanceOf[TypeConverter.Buffered[T]] + .withRepetition(Repetition.REPEATED) + val arrayConverter = new TypeConverter.Delegate[T, C[T]](buffered) { + override def get: C[T] = inner.get(fc.fromSpecific) + } + + if (hasAvroArray) { + new GroupConverter with TypeConverter.Buffered[C[T]] { + override def getConverter(fieldIndex: Int): Converter = { + require(fieldIndex == 0, "Avro array field index != 0") + arrayConverter + } + override def start(): Unit = () + override def end(): Unit = addValue(arrayConverter.get) + override def get: C[T] = get(_.headOption.getOrElse(fc.newBuilder.result())) + } + } else { + arrayConverter + } + } + + override val fieldDocs: Map[String, String] = t.fieldDocs + + override val typeDoc: Option[String] = None + } + } + + // //////////////////////////////////////////////// + + def logicalType[T](lta: => LogicalTypeAnnotation): LogicalTypeWord[T] = + new LogicalTypeWord[T](lta) + + class LogicalTypeWord[T](lta: => LogicalTypeAnnotation) extends Serializable { + def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { + override def buildSchema(cm: CaseMapper): Type = Schema.setLogicalType(pf.schema(cm), lta) + override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) + override def newConverter: TypeConverter[U] = + pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) + + override type ParquetT = pf.ParquetT + } + } + + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + // Precision and scale are not encoded in the `BigDecimal` type and must be specified + def decimal32(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { + require(1 <= precision && precision <= 9, s"Precision for INT32 $precision not within [1, 9]") + require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") + logicalType[Int](LogicalTypeAnnotation.decimalType(scale, precision))(x => + BigDecimal(BigInt(x), scale) + )(_.underlying().unscaledValue().intValue()) + } + + def decimal64(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { + require(1 <= precision && precision <= 18, s"Precision for INT64 $precision not within [1, 18]") + require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") + logicalType[Long](LogicalTypeAnnotation.decimalType(scale, precision))(x => + BigDecimal(BigInt(x), scale) + )(_.underlying().unscaledValue().longValue()) + } + + def decimalFixed(length: Int, precision: Int, scale: Int = 0): Primitive[BigDecimal] = { + val capacity = math.floor(math.log10(math.pow(2, 8 * length - 1) - 1)).toInt + require( + 1 <= precision && precision <= capacity, + s"Precision for FIXED($length) not within [1, $capacity]" + ) + + new Primitive[BigDecimal] { + override def buildSchema(cm: CaseMapper): Type = + Schema.primitive( + PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.decimalType(scale, precision), + length + ) + + override def write(c: RecordConsumer, v: BigDecimal)(cm: CaseMapper): Unit = + c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length))) + + override def newConverter: TypeConverter[BigDecimal] = TypeConverter.newByteArray.map { ba => + Decimal.fromBytes(ba, precision, scale) + } + + override type ParquetT = Binary + } + } + + def decimalBinary(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { + require(1 <= precision, s"Precision $precision <= 0") + require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") + logicalType[Array[Byte]](LogicalTypeAnnotation.decimalType(scale, precision))( + Decimal.fromBytes(_, precision, scale) + )(Decimal.toBytes(_, precision, scale)) + } + + @nowarn("msg=parameter value lp in method pfEnum is never used") + implicit def pfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): Primitive[T] = + logicalType[String](LogicalTypeAnnotation.enumType())(et.from)(et.to) + + implicit val ptUuid: Primitive[UUID] = new Primitive[UUID] { + override def buildSchema(cm: CaseMapper): Type = + Schema.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, length = 16) + + override def write(c: RecordConsumer, v: UUID)(cm: CaseMapper): Unit = + c.addBinary( + Binary.fromConstantByteArray( + ByteBuffer + .allocate(16) + .order(ByteOrder.BIG_ENDIAN) + .putLong(v.getMostSignificantBits) + .putLong(v.getLeastSignificantBits) + .array() + ) + ) + + override def newConverter: TypeConverter[UUID] = TypeConverter.newByteArray.map { ba => + val bb = ByteBuffer.wrap(ba) + val h = bb.getLong + val l = bb.getLong + new UUID(h, l) + } + + override type ParquetT = Binary + } + + implicit val ptDate: Primitive[LocalDate] = + logicalType[Int](LogicalTypeAnnotation.dateType())(LocalDate.ofEpochDay(_))(_.toEpochDay.toInt) + +} diff --git a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala index e65ef4d27..02053f5d4 100644 --- a/parquet/src/main/scala/magnolify/parquet/ParquetType.scala +++ b/parquet/src/main/scala/magnolify/parquet/ParquetType.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Spotify AB + * Copyright 2022 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,8 @@ package magnolify.parquet -import java.nio.{ByteBuffer, ByteOrder} -import java.time.LocalDate -import java.util.UUID -import magnolia1._ import magnolify.shared.{Converter => _, _} -import magnolify.shims.FactoryCompat +import org.apache.avro.{Schema => AvroSchema} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.AvroSchemaConverter @@ -33,19 +29,15 @@ import org.apache.parquet.hadoop.{ ParquetWriter } import org.apache.parquet.io.api._ -import org.apache.parquet.io.{InputFile, OutputFile, ParquetDecodingException} -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, Type, Types} +import org.apache.parquet.io.{InputFile, OutputFile} +import org.apache.parquet.schema.MessageType import org.slf4j.LoggerFactory -import scala.annotation.implicitNotFound -import scala.collection.concurrent -import scala.collection.compat._ -import scala.annotation.nowarn - sealed trait ParquetArray +/** + * Add `import magnolify.parquet.ParquetArray.AvroCompat._` to generate AVRO schema on write + */ object ParquetArray { implicit case object default extends ParquetArray @@ -58,6 +50,7 @@ sealed trait ParquetType[T] extends Serializable { import ParquetType._ def schema: MessageType + def avroSchema: AvroSchema val avroCompat: Boolean def setupInput(job: Job): Unit = { @@ -93,7 +86,13 @@ object ParquetType { )(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match { case r: ParquetField.Record[_] => new ParquetType[T] { - override def schema: MessageType = Schema.message(r.schema(cm)) + override lazy val schema: MessageType = Schema.message(r.schema(cm)) + override lazy val avroSchema: AvroSchema = { + val s = new AvroSchemaConverter().convert(schema) + // add doc to avro schema + SchemaUtil.deepCopy(s, f.typeDoc, f.fieldDocs.get) + } + override val avroCompat: Boolean = pa == ParquetArray.AvroCompat.avroCompat || f.hasAvroArray override def write(c: RecordConsumer, v: T): Unit = r.write(c, v)(cm) @@ -176,14 +175,21 @@ object ParquetType { if (parquetType == null) { parquetType = SerializationUtils.fromBase64(configuration.get(WriteTypeKey)) } + val schema = Schema.message(parquetType.schema) val metadata = new java.util.HashMap[String, String]() if (parquetType.avroCompat) { - val avroSchema = new AvroSchemaConverter().convert(schema) // This overrides `WriteSupport#getName` metadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, "avro") - metadata.put(AVRO_SCHEMA_METADATA_KEY, avroSchema.toString) + metadata.put(AVRO_SCHEMA_METADATA_KEY, parquetType.avroSchema.toString()) + } else { + logger.warn( + "Parquet file is being written with no avro compatibility, this mode is not " + + "producing schema. Add `import magnolify.parquet.ParquetArray.AvroCompat._` to " + + "generate schema" + ) } + new hadoop.WriteSupport.WriteContext(schema, metadata) } @@ -197,395 +203,3 @@ object ParquetType { } } } - -////////////////////////////////////////////////// - -sealed trait ParquetField[T] extends Serializable { - - @transient private lazy val schemaCache: concurrent.Map[UUID, Type] = - concurrent.TrieMap.empty - - protected def buildSchema(cm: CaseMapper): Type - def schema(cm: CaseMapper): Type = - schemaCache.getOrElseUpdate(cm.uuid, buildSchema(cm)) - - val hasAvroArray: Boolean = false - protected val isGroup: Boolean = false - protected def isEmpty(v: T): Boolean - def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit - def newConverter: TypeConverter[T] - - protected def writeGroup(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { - if (isGroup) { - c.startGroup() - } - write(c, v)(cm) - if (isGroup) { - c.endGroup() - } - } -} - -object ParquetField { - sealed trait Record[T] extends ParquetField[T] { - override protected val isGroup: Boolean = true - override protected def isEmpty(v: T): Boolean = false - } - - // //////////////////////////////////////////// - type Typeclass[T] = ParquetField[T] - - def join[T](caseClass: CaseClass[Typeclass, T]): ParquetField[T] = { - if (caseClass.isValueClass) { - val p = caseClass.parameters.head - val tc = p.typeclass - new ParquetField[T] { - override protected def buildSchema(cm: CaseMapper): Type = tc.buildSchema(cm) - override protected def isEmpty(v: T): Boolean = tc.isEmpty(p.dereference(v)) - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = - tc.writeGroup(c, p.dereference(v))(cm) - override def newConverter: TypeConverter[T] = { - val buffered = tc.newConverter - .asInstanceOf[TypeConverter.Buffered[p.PType]] - new TypeConverter.Delegate[p.PType, T](buffered) { - override def get: T = inner.get(b => caseClass.construct(_ => b.head)) - } - } - } - } else { - new Record[T] { - override def buildSchema(cm: CaseMapper): Type = - caseClass.parameters - .foldLeft(Types.requiredGroup()) { (g, p) => - g.addField(Schema.rename(p.typeclass.schema(cm), cm.map(p.label))) - } - .named(caseClass.typeName.full) - - override val hasAvroArray: Boolean = caseClass.parameters.exists(_.typeclass.hasAvroArray) - - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = { - caseClass.parameters.foreach { p => - val x = p.dereference(v) - if (!p.typeclass.isEmpty(x)) { - val name = cm.map(p.label) - c.startField(name, p.index) - p.typeclass.writeGroup(c, x)(cm) - c.endField(name, p.index) - } - } - } - - override def newConverter: TypeConverter[T] = - new GroupConverter with TypeConverter.Buffered[T] { - private val fieldConverters = caseClass.parameters.map(_.typeclass.newConverter) - - override def isPrimitive: Boolean = false - - override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - - override def start(): Unit = () - - override def end(): Unit = { - val value = caseClass.construct { p => - try { - fieldConverters(p.index).get - } catch { - case e: IllegalArgumentException => - val field = s"${caseClass.typeName.full}#${p.label}" - throw new ParquetDecodingException( - s"Failed to decode $field: ${e.getMessage}", - e - ) - } - } - addValue(value) - } - } - } - } - } - - @implicitNotFound("Cannot derive ParquetType for sealed trait") - private sealed trait Dispatchable[T] - def split[T: Dispatchable](sealedTrait: SealedTrait[Typeclass, T]): ParquetField[T] = ??? - implicit def apply[T]: ParquetField[T] = macro Magnolia.gen[T] - - // //////////////////////////////////////////////// - - def from[T]: FromWord[T] = new FromWord[T] - - class FromWord[T] { - def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = - new Primitive[U] { - override def buildSchema(cm: CaseMapper): Type = pf.schema(cm) - override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) - override def newConverter: TypeConverter[U] = - pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) - override type ParquetT = pf.ParquetT - } - } - - // //////////////////////////////////////////////// - - sealed trait Primitive[T] extends ParquetField[T] { - override protected def isEmpty(v: T): Boolean = false - type ParquetT <: Comparable[ParquetT] - } - - def primitive[T, UnderlyingT <: Comparable[UnderlyingT]]( - f: RecordConsumer => T => Unit, - g: => TypeConverter[T], - ptn: PrimitiveTypeName, - lta: => LogicalTypeAnnotation = null - ): Primitive[T] = - new Primitive[T] { - override def buildSchema(cm: CaseMapper): Type = Schema.primitive(ptn, lta) - override def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit = f(c)(v) - override def newConverter: TypeConverter[T] = g - override type ParquetT = UnderlyingT - } - - implicit val pfBoolean = - primitive[Boolean, java.lang.Boolean]( - _.addBoolean, - TypeConverter.newBoolean, - PrimitiveTypeName.BOOLEAN - ) - - implicit val pfByte = - primitive[Byte, Integer]( - c => v => c.addInteger(v.toInt), - TypeConverter.newInt.map(_.toByte), - PrimitiveTypeName.INT32, - LogicalTypeAnnotation.intType(8, true) - ) - implicit val pfShort = - primitive[Short, Integer]( - c => v => c.addInteger(v.toInt), - TypeConverter.newInt.map(_.toShort), - PrimitiveTypeName.INT32, - LogicalTypeAnnotation.intType(16, true) - ) - implicit val pfInt = - primitive[Int, Integer]( - _.addInteger, - TypeConverter.newInt, - PrimitiveTypeName.INT32, - LogicalTypeAnnotation.intType(32, true) - ) - implicit val pfLong = - primitive[Long, java.lang.Long]( - _.addLong, - TypeConverter.newLong, - PrimitiveTypeName.INT64, - LogicalTypeAnnotation.intType(64, true) - ) - implicit val pfFloat = - primitive[Float, java.lang.Float](_.addFloat, TypeConverter.newFloat, PrimitiveTypeName.FLOAT) - - implicit val pfDouble = - primitive[Double, java.lang.Double]( - _.addDouble, - TypeConverter.newDouble, - PrimitiveTypeName.DOUBLE - ) - - implicit val pfByteArray = - primitive[Array[Byte], Binary]( - c => v => c.addBinary(Binary.fromConstantByteArray(v)), - TypeConverter.newByteArray, - PrimitiveTypeName.BINARY - ) - implicit val pfString = - primitive[String, Binary]( - c => v => c.addBinary(Binary.fromString(v)), - TypeConverter.newString, - PrimitiveTypeName.BINARY, - LogicalTypeAnnotation.stringType() - ) - - implicit def pfOption[T](implicit t: ParquetField[T]): ParquetField[Option[T]] = - new ParquetField[Option[T]] { - override def buildSchema(cm: CaseMapper): Type = - Schema.setRepetition(t.schema(cm), Repetition.OPTIONAL) - override protected def isEmpty(v: Option[T]): Boolean = v.isEmpty - - override def write(c: RecordConsumer, v: Option[T])(cm: CaseMapper): Unit = - v.foreach(t.writeGroup(c, _)(cm)) - - override def newConverter: TypeConverter[Option[T]] = { - val buffered = t.newConverter - .asInstanceOf[TypeConverter.Buffered[T]] - .withRepetition(Repetition.OPTIONAL) - new TypeConverter.Delegate[T, Option[T]](buffered) { - override def get: Option[T] = inner.get(_.headOption) - } - } - } - - private val AvroArrayField = "array" - implicit def ptIterable[T, C[T]](implicit - t: ParquetField[T], - ti: C[T] => Iterable[T], - fc: FactoryCompat[T, C[T]], - pa: ParquetArray - ): ParquetField[C[T]] = { - new ParquetField[C[T]] { - override val hasAvroArray: Boolean = pa match { - case ParquetArray.default => false - case ParquetArray.AvroCompat.avroCompat => true - } - - override def buildSchema(cm: CaseMapper): Type = { - val repeatedSchema = Schema.setRepetition(t.schema(cm), Repetition.REPEATED) - if (hasAvroArray) { - Types - .requiredGroup() - .addField(Schema.rename(repeatedSchema, AvroArrayField)) - .as(LogicalTypeAnnotation.listType()) - .named(t.schema(cm).getName) - } else { - repeatedSchema - } - } - - override protected val isGroup: Boolean = hasAvroArray - override protected def isEmpty(v: C[T]): Boolean = v.isEmpty - - override def write(c: RecordConsumer, v: C[T])(cm: CaseMapper): Unit = - if (hasAvroArray) { - c.startField(AvroArrayField, 0) - v.foreach(t.writeGroup(c, _)(cm)) - c.endField(AvroArrayField, 0) - } else { - v.foreach(t.writeGroup(c, _)(cm)) - } - - override def newConverter: TypeConverter[C[T]] = { - val buffered = t.newConverter - .asInstanceOf[TypeConverter.Buffered[T]] - .withRepetition(Repetition.REPEATED) - val arrayConverter = new TypeConverter.Delegate[T, C[T]](buffered) { - override def get: C[T] = inner.get(fc.fromSpecific) - } - - if (hasAvroArray) { - new GroupConverter with TypeConverter.Buffered[C[T]] { - override def getConverter(fieldIndex: Int): Converter = { - require(fieldIndex == 0, "Avro array field index != 0") - arrayConverter - } - override def start(): Unit = () - override def end(): Unit = addValue(arrayConverter.get) - override def get: C[T] = get(_.headOption.getOrElse(fc.newBuilder.result())) - } - } else { - arrayConverter - } - } - } - } - - // //////////////////////////////////////////////// - - def logicalType[T](lta: => LogicalTypeAnnotation): LogicalTypeWord[T] = - new LogicalTypeWord[T](lta) - - class LogicalTypeWord[T](lta: => LogicalTypeAnnotation) extends Serializable { - def apply[U](f: T => U)(g: U => T)(implicit pf: Primitive[T]): Primitive[U] = new Primitive[U] { - override def buildSchema(cm: CaseMapper): Type = Schema.setLogicalType(pf.schema(cm), lta) - override def write(c: RecordConsumer, v: U)(cm: CaseMapper): Unit = pf.write(c, g(v))(cm) - override def newConverter: TypeConverter[U] = - pf.newConverter.asInstanceOf[TypeConverter.Primitive[T]].map(f) - - override type ParquetT = pf.ParquetT - } - } - - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md - // Precision and scale are not encoded in the `BigDecimal` type and must be specified - def decimal32(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { - require(1 <= precision && precision <= 9, s"Precision for INT32 $precision not within [1, 9]") - require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") - logicalType[Int](LogicalTypeAnnotation.decimalType(scale, precision))(x => - BigDecimal(BigInt(x), scale) - )(_.underlying().unscaledValue().intValue()) - } - - def decimal64(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { - require(1 <= precision && precision <= 18, s"Precision for INT64 $precision not within [1, 18]") - require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") - logicalType[Long](LogicalTypeAnnotation.decimalType(scale, precision))(x => - BigDecimal(BigInt(x), scale) - )(_.underlying().unscaledValue().longValue()) - } - - def decimalFixed(length: Int, precision: Int, scale: Int = 0): Primitive[BigDecimal] = { - val capacity = math.floor(math.log10(math.pow(2, (8 * length - 1).toDouble) - 1)).toInt - require( - 1 <= precision && precision <= capacity, - s"Precision for FIXED($length) not within [1, $capacity]" - ) - - new Primitive[BigDecimal] { - override def buildSchema(cm: CaseMapper): Type = - Schema.primitive( - PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, - LogicalTypeAnnotation.decimalType(scale, precision), - length - ) - - override def write(c: RecordConsumer, v: BigDecimal)(cm: CaseMapper): Unit = - c.addBinary(Binary.fromConstantByteArray(Decimal.toFixed(v, precision, scale, length))) - - override def newConverter: TypeConverter[BigDecimal] = TypeConverter.newByteArray.map { ba => - Decimal.fromBytes(ba, precision, scale) - } - - override type ParquetT = Binary - } - } - - def decimalBinary(precision: Int, scale: Int = 0): Primitive[BigDecimal] = { - require(1 <= precision, s"Precision $precision <= 0") - require(0 <= scale && scale < precision, s"Scale $scale not within [0, $precision)") - logicalType[Array[Byte]](LogicalTypeAnnotation.decimalType(scale, precision))( - Decimal.fromBytes(_, precision, scale) - )(Decimal.toBytes(_, precision, scale)) - } - - @nowarn("msg=parameter value lp in method pfEnum is never used") - implicit def pfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): Primitive[T] = - logicalType[String](LogicalTypeAnnotation.enumType())(et.from)(et.to) - - implicit val ptUuid: Primitive[UUID] = new Primitive[UUID] { - override def buildSchema(cm: CaseMapper): Type = - Schema.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, length = 16) - - override def write(c: RecordConsumer, v: UUID)(cm: CaseMapper): Unit = - c.addBinary( - Binary.fromConstantByteArray( - ByteBuffer - .allocate(16) - .order(ByteOrder.BIG_ENDIAN) - .putLong(v.getMostSignificantBits) - .putLong(v.getLeastSignificantBits) - .array() - ) - ) - - override def newConverter: TypeConverter[UUID] = TypeConverter.newByteArray.map { ba => - val bb = ByteBuffer.wrap(ba) - val h = bb.getLong - val l = bb.getLong - new UUID(h, l) - } - - override type ParquetT = Binary - } - - implicit val ptDate: Primitive[LocalDate] = - logicalType[Int](LogicalTypeAnnotation.dateType())(i => LocalDate.ofEpochDay(i.toLong))( - _.toEpochDay.toInt - ) -} diff --git a/parquet/src/main/scala/magnolify/parquet/SchemaUtil.scala b/parquet/src/main/scala/magnolify/parquet/SchemaUtil.scala new file mode 100644 index 000000000..609c3f8bc --- /dev/null +++ b/parquet/src/main/scala/magnolify/parquet/SchemaUtil.scala @@ -0,0 +1,82 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet + +import org.apache.avro.{Schema => AvroSchema} + +import scala.jdk.CollectionConverters._ + +object SchemaUtil { + + type Path = String + + object Union { + def unapply(schema: AvroSchema): Option[Seq[AvroSchema]] = + if (schema.isUnion) Some(schema.getTypes.asScala.toSeq) else None + } + + object Array { + def unapply(schema: AvroSchema): Option[AvroSchema] = + if (schema.getType == AvroSchema.Type.ARRAY) Some(schema.getElementType) else None + } + + object Record { + def unapply(schema: AvroSchema): Option[Seq[AvroSchema.Field]] = + if (schema.getType == AvroSchema.Type.RECORD) Some(schema.getFields.asScala.toSeq) else None + } + + def deepCopy( + schema: AvroSchema, + rootDoc: Option[String], + getFieldDoc: Path => Option[String] + ): AvroSchema = deepCopyInternal(schema, rootDoc, getFieldDoc, "") + + private def deepCopyInternal( + schema: AvroSchema, + rootDoc: Option[String], + getFieldDoc: Path => Option[String], + path: Path + ): AvroSchema = schema match { + case Union(ts) => + val updatedTypes = ts.foldLeft(List.newBuilder[AvroSchema]) { (b, t) => + b += deepCopyInternal(t, None, getFieldDoc, path) + } + AvroSchema.createUnion(updatedTypes.result().asJava) + case Array(t) => + val updatedElementType = deepCopyInternal(t, None, getFieldDoc, path) + AvroSchema.createArray(updatedElementType) + case Record(fs) => + val updatedFields = fs.foldLeft(List.newBuilder[AvroSchema.Field]) { (b, f) => + val fieldPath = if (path.isEmpty) f.name() else s"$path.${f.name()}" + b += new AvroSchema.Field( + f.name(), + deepCopyInternal(f.schema(), None, getFieldDoc, fieldPath), + getFieldDoc(fieldPath).orNull, + f.defaultVal() + ) + } + AvroSchema.createRecord( + schema.getName, + rootDoc.orNull, + schema.getNamespace, + schema.isError, + updatedFields.result().asJava + ) + case _ => + schema + } +} diff --git a/parquet/src/test/scala/magnolify/parquet/test/AvroParquetSuite.scala b/parquet/src/test/scala/magnolify/parquet/test/AvroParquetSuite.scala index a2d4b6814..38585b540 100644 --- a/parquet/src/test/scala/magnolify/parquet/test/AvroParquetSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/test/AvroParquetSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2021 Spotify AB + * Copyright 2022 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package magnolify.parquet.test import java.time._ - import cats._ import magnolify.cats.auto._ import magnolify.avro._ @@ -25,10 +24,10 @@ import magnolify.avro.unsafe._ import magnolify.parquet._ import magnolify.parquet.unsafe._ import magnolify.parquet.ParquetArray.AvroCompat._ +import magnolify.parquet.test.util.AvroSchemaComparer import magnolify.scalacheck.auto._ import magnolify.test._ import magnolify.test.Simple._ - import magnolify.test.Time._ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration @@ -44,7 +43,9 @@ import org.scalacheck._ import scala.reflect.ClassTag class AvroParquetSuite extends MagnolifySuite { - private def test[T: Arbitrary: ClassTag](implicit + private def test[T: Arbitrary: ClassTag]( + schemaErrors: List[String] = List.empty + )(implicit at: AvroType[T], pt: ParquetType[T], eq: Eq[T] @@ -95,36 +96,71 @@ class AvroParquetSuite extends MagnolifySuite { parquet.checkContains(avro) avro.checkContains(parquet) } + + test(s"$name.avroSchema") { + val results = AvroSchemaComparer.compareSchemas(at.schema, pt.avroSchema) + assertEquals(results, schemaErrors) + } } - test[Integers] - test[Floats] - test[Required] - test[Nullable] + test[Integers]() + test[Floats]() + test[Required]() + test[Nullable]() { - test[Repeated] - test[Nested] + test[Repeated]() + test[Nested]( + List( + "root.r 'name' are different 'Required' != 'r'", + "root.r 'namespace' are different 'magnolify.test.Simple' != 'null'", + "root.o 'name' are different 'Required' != 'o'", + "root.o 'namespace' are different 'magnolify.test.Simple' != 'null'", + "root.l 'name' are different 'Required' != 'array'", + "root.l 'namespace' are different 'magnolify.test.Simple' != 'null'" + ) + ) } - test[Unsafe] + test[Unsafe]() { import Collections._ - test[Collections] - test[MoreCollections] + test[Collections]() + test[MoreCollections]() } { import Enums._ import UnsafeEnums._ - test[Enums] - test[UnsafeEnums] + test[Enums]( + List( + "root.j 'name' are different 'Color' != 'string'", + "root.j 'type' are different 'ENUM' != 'STRING'", + "root.s 'name' are different 'Color' != 'string'", + "root.s 'type' are different 'ENUM' != 'STRING'", + "root.a 'name' are different 'Color' != 'string'", + "root.a 'type' are different 'ENUM' != 'STRING'", + "root.jo 'name' are different 'Color' != 'string'", + "root.jo 'type' are different 'ENUM' != 'STRING'", + "root.so 'name' are different 'Color' != 'string'", + "root.so 'type' are different 'ENUM' != 'STRING'", + "root.ao 'name' are different 'Color' != 'string'", + "root.ao 'type' are different 'ENUM' != 'STRING'", + "root.jr 'name' are different 'Color' != 'string'", + "root.jr 'type' are different 'ENUM' != 'STRING'", + "root.sr 'name' are different 'Color' != 'string'", + "root.sr 'type' are different 'ENUM' != 'STRING'", + "root.ar 'name' are different 'Color' != 'string'", + "root.ar 'type' are different 'ENUM' != 'STRING'" + ) + ) + test[UnsafeEnums]() } { implicit val eqByteArray: Eq[Array[Byte]] = Eq.by(_.toList) - test[ParquetTypes] + test[ParquetTypes]() } { @@ -134,24 +170,71 @@ class AvroParquetSuite extends MagnolifySuite { implicit val arbBigDecimal: Arbitrary[BigDecimal] = Arbitrary(Gen.choose(-nines, nines).map(BigDecimal(_))) implicit val pfBigDecimal: ParquetField[BigDecimal] = ParquetField.decimalBinary(38, 9) - test[DecimalBinary] + test[DecimalBinary]() } - test[AvroParquetLogical] + test[AvroParquetLogical]() { import magnolify.avro.logical.millis._ import magnolify.parquet.logical.millis._ - test[AvroParquetTimeMillis] + test[AvroParquetTimeMillis]() } { import magnolify.avro.logical.micros._ import magnolify.parquet.logical.micros._ - test[AvroParquetTimeMicros] + test[AvroParquetTimeMicros]() } + + // nested record doc is lost + test[AvroParquetWithNestedAnnotations]( + List( + "root.nested 'name' are different 'AvroParquetWithAnnotations' != 'nested'", + "root.nested 'doc' are different 'Should be ignored' != 'null'", + "root.nested 'namespace' are different 'magnolify.parquet.test' != 'null'" + ) + ) + test[AvroParquetWithAnnotationsAndOptions]( + List( + "root.nested 'name' are different 'AvroParquetWithAnnotations' != 'nested'", + "root.nested 'doc' are different 'Should be ignored' != 'null'", + "root.nested 'namespace' are different 'magnolify.parquet.test' != 'null'" + ) + ) + test[AvroParquetWithAnnotationsAndLists]( + List( + "root.nested 'name' are different 'AvroParquetWithAnnotations' != 'array'", + "root.nested 'doc' are different 'Should be ignored' != 'null'", + "root.nested 'namespace' are different 'magnolify.parquet.test' != 'null'" + ) + ) } case class AvroParquetLogical(d: LocalDate) case class AvroParquetTimeMillis(i: Instant, dt: LocalDateTime, t: LocalTime) case class AvroParquetTimeMicros(i: Instant, dt: LocalDateTime, t: LocalTime) +@doc("Should be ignored") +case class AvroParquetWithAnnotations( + @doc("nested field policy") s: String, + s2: String +) +@doc("root level policy") +case class AvroParquetWithNestedAnnotations( + @doc("string policy") s: String, + @doc("record policy") nested: AvroParquetWithAnnotations +) + +@doc("root level policy") +case class AvroParquetWithAnnotationsAndOptions( + @doc("string policy") s: String, + @doc("record policy") + nested: Option[AvroParquetWithAnnotations] +) + +@doc("root level policy") +case class AvroParquetWithAnnotationsAndLists( + @doc("string policy") s: String, + @doc("record policy") + nested: List[AvroParquetWithAnnotations] +) diff --git a/parquet/src/test/scala/magnolify/parquet/test/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/test/ParquetTypeSuite.scala index a0a2cc7a5..1a3b81aa7 100644 --- a/parquet/src/test/scala/magnolify/parquet/test/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/test/ParquetTypeSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020 Spotify AB + * Copyright 2022 Spotify AB * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import magnolify.parquet._ import magnolify.parquet.unsafe._ import magnolify.scalacheck.auto._ import magnolify.shared.CaseMapper +import magnolify.shared.doc import magnolify.test.Simple._ import magnolify.test.Time._ import magnolify.test._ @@ -80,6 +81,28 @@ class ParquetTypeSuite extends MagnolifySuite { test[UnsafeEnums] } + test("ParquetDoc") { + ensureSerializable(ParquetType[ParquetNestedDoc]) + val pf = ParquetField[ParquetNestedDoc] + + assert(pf.fieldDocs("pd") == "nested") + assert(pf.fieldDocs("pd.i") == "integers") + assert(pf.fieldDocs("pd.s") == "string") + assert(pf.fieldDocs("i") == "integers") + assert(pf.typeDoc.contains("Parquet with doc")) + } + + test("ParquetDocWithNestedList") { + ensureSerializable(ParquetType[ParquetNestedListDoc]) + val pf = ParquetField[ParquetNestedListDoc] + + assert(pf.fieldDocs("pd") == "nested") + assert(pf.fieldDocs("pd.i") == "integers") + assert(pf.fieldDocs("pd.s") == "string") + assert(pf.fieldDocs("i") == "integers") + assert(pf.typeDoc.contains("Parquet with doc with nested list")) + } + { import Custom._ implicit val pfUri: ParquetField[URI] = ParquetField.from[String](URI.create)(_.toString) @@ -176,6 +199,18 @@ case class Logical(u: UUID, d: LocalDate) case class TimeMillis(i: Instant, dt: LocalDateTime, ot: OffsetTime, t: LocalTime) case class TimeMicros(i: Instant, dt: LocalDateTime, ot: OffsetTime, t: LocalTime) case class TimeNanos(i: Instant, dt: LocalDateTime, ot: OffsetTime, t: LocalTime) +@doc("Parquet with doc") +case class ParquetDoc(@doc("string") s: String, @doc("integers") i: Integers) + +@doc("Parquet with doc") +case class ParquetNestedDoc(@doc("nested") pd: ParquetDoc, @doc("integers") i: Integers) + +@doc("Parquet with doc with nested list") +case class ParquetNestedListDoc( + @doc("nested") pd: List[ParquetDoc], + @doc("integers") + i: List[Integers] +) class TestInputFile(ba: Array[Byte]) extends InputFile { private val bais = new ByteArrayInputStream(ba) diff --git a/parquet/src/test/scala/magnolify/parquet/test/SchemaUtilSuite.scala b/parquet/src/test/scala/magnolify/parquet/test/SchemaUtilSuite.scala new file mode 100644 index 000000000..d94d40299 --- /dev/null +++ b/parquet/src/test/scala/magnolify/parquet/test/SchemaUtilSuite.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet.test + +import magnolify.parquet.SchemaUtil +import magnolify.parquet.test.util.AvroSchemaComparer +import magnolify.test.MagnolifySuite +import org.apache.avro.Schema + +class SchemaUtilSuite extends MagnolifySuite { + + test(s"schema") { + val nestedSchema = + """ + |{ + | "type" : "record", + | "name" : "Integers", + | "namespace" : "magnolify.test.Simple", + | "fields" : [ { + | "name" : "i", + | "type" : "int" + | }, { + | "name" : "l", + | "type" : "long" + | } ] + |} + |""".stripMargin + val inputSchema = new Schema.Parser().parse(nestedSchema) + + val outputSchema = SchemaUtil.deepCopy( + inputSchema, + Some("root level"), + _ => Some("field level") + ) + + val results = AvroSchemaComparer.compareSchemas(inputSchema, outputSchema) + assertEquals( + results, + List( + "root 'doc' are different 'null' != 'root level'", + "root.i field 'doc' are different 'null' != 'field level'", + "root.l field 'doc' are different 'null' != 'field level'" + ) + ) + } +} diff --git a/parquet/src/test/scala/magnolify/parquet/test/util/AvroSchemaComparer.scala b/parquet/src/test/scala/magnolify/parquet/test/util/AvroSchemaComparer.scala new file mode 100644 index 000000000..77a8cd253 --- /dev/null +++ b/parquet/src/test/scala/magnolify/parquet/test/util/AvroSchemaComparer.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet.test.util + +import magnolify.parquet.SchemaUtil._ +import org.apache.avro.Schema +import scala.util.Try + +object AvroSchemaComparer { + + def compareSchemas( + schema1: Schema, + schema2: Schema, + path: String = "root" + ): List[String] = { + compareBasicTypeInfo(schema1, schema2, path) ++ { + (schema1, schema2) match { + case (null, null) => List() + case (null, _) => List("schema2 is null") + case (_, null) => List("schema1 is null") + case (Union(nestedTypes1), Union(nestedTypes2)) => + if (nestedTypes1.size != nestedTypes2.size) { + List(s"Size of union is different ${nestedTypes1.size} != ${nestedTypes2.size}") + } else { + nestedTypes1 + .zip(nestedTypes2) + .flatMap { case (is1, is2) => compareSchemas(is1, is2, path) } + } + case (Array(arrayEl1), Array(arrayEl2)) => + compareSchemas(arrayEl1, arrayEl2, path) + case (Record(schemaFields1), Record(schemaFields2)) => + val fields1 = schemaFields1.map(_.name()) + val fields2 = schemaFields2.map(_.name()) + + val fieldsEqualResults = require( + fields1 == fields2, + s"$path fields are not equal '$fields1' != '$fields2'" + ) + + fieldsEqualResults ++ + fields1 + .intersect(fields2) + .flatMap { f => + val field1 = schema1.getField(f) + val field2 = schema2.getField(f) + + require( + field1.doc() == field2.doc(), + s"$path.$f field 'doc' are different '${field1.doc}' != '${field2.doc}'" + ) ++ require( + field1.pos() == field2.pos(), + s"$path.$f field 'pos' are different '${field1.pos}' != '${field2.pos}'" + ) ++ compareSchemas(field1.schema(), field2.schema(), s"$path.$f") + } + case _ => + List.empty + } + } + } + + private def require(condition: Boolean, error: => String): Option[String] = { + if (!condition) + Some(error) + else None + } + + private def compareBasicTypeInfo(s1: Schema, s2: Schema, path: String): List[String] = { + if (s1 != null && s2 != null) { + require( + s1.getName == s2.getName, + s"$path 'name' are different '${s1.getName}' != '${s2.getName}'" + ) ++ require( + s1.getType == s2.getType, + s"$path 'type' are different '${s1.getType}' != '${s2.getType}'" + ) ++ require( + s1.isNullable == s2.isNullable, + s"$path 'isNullable' are different '${s1.isNullable}' != '${s2.isNullable}'" + ) ++ require( + s1.getDoc == s2.getDoc, + s"$path 'doc' are different '${s1.getDoc}' != '${s2.getDoc}'" + ) ++ require( + Try(s1.getNamespace == s2.getNamespace).getOrElse(true), + s"$path 'namespace' are different '${s1.getNamespace}' != '${s2.getNamespace}'" + ) + }.toList + else List.empty + } +} diff --git a/shared/src/main/scala/magnolify/shared/doc.scala b/shared/src/main/scala/magnolify/shared/doc.scala new file mode 100644 index 000000000..95e539ce2 --- /dev/null +++ b/shared/src/main/scala/magnolify/shared/doc.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.shared + +import scala.annotation.StaticAnnotation + +class doc(doc: String) extends StaticAnnotation with Serializable { + override def toString: String = doc +}