Skip to content

Commit

Permalink
[avro] Use SchemaBuilder API
Browse files Browse the repository at this point in the history
Move implementation to the SchemaBuilder API.
This removes the cluncky JSON conversion type required for default
values
  • Loading branch information
RustedBones committed Jul 22, 2024
1 parent aa00c1f commit bb89868
Showing 1 changed file with 56 additions and 43 deletions.
99 changes: 56 additions & 43 deletions avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import magnolify.shared.{doc => _, _}
import magnolify.shims.FactoryCompat
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic._
import org.apache.avro.{JsonProperties, LogicalType, LogicalTypes, Schema}
import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
import org.joda.{time => joda}

import java.nio.{ByteBuffer, ByteOrder}
Expand Down Expand Up @@ -105,23 +105,29 @@ object AvroField {
}
} else {
new Record[T] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema
.createRecord(
caseClass.typeName.short,
getDoc(caseClass.annotations, caseClass.typeName.full),
caseClass.typeName.owner,
false,
caseClass.parameters.map { p =>
new Schema.Field(
cm.map(p.label),
p.typeclass.schema(cm),
getDoc(p.annotations, s"${caseClass.typeName.full}#${p.label}"),
p.default
.map(d => p.typeclass.makeDefault(d)(cm))
.getOrElse(p.typeclass.fallbackDefault)
)
}.asJava
)
override protected def buildSchema(cm: CaseMapper): Schema = {
val builder = SchemaBuilder
.record(caseClass.typeName.short)
.namespace(caseClass.typeName.owner)
.doc(getDoc(caseClass.annotations, caseClass.typeName.full))
.fields()

caseClass.parameters
.foldLeft(builder) { (b, p) =>
val f = b
.name(cm.map(p.label))
.doc(getDoc(p.annotations, s"${caseClass.typeName.full}#${p.label}"))
.`type`(p.typeclass.schema(cm))

p.default match {
case Some(d) =>
val test = p.typeclass.makeDefault(d)(cm)
f.withDefault(test)
case None => f.noDefault()
}
}
.endRecord()
}

// `JacksonUtils.toJson` expects `Map[String, Any]` for `RECORD` defaults
override def makeDefault(d: T)(cm: CaseMapper): ju.Map[String, Any] = {
Expand Down Expand Up @@ -198,9 +204,8 @@ object AvroField {
implicit val afFloat: AvroField[Float] = id[Float](Schema.Type.FLOAT)
implicit val afDouble: AvroField[Double] = id[Double](Schema.Type.DOUBLE)
implicit val afByteBuffer: AvroField[ByteBuffer] = new Aux[ByteBuffer, ByteBuffer, ByteBuffer] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.create(Schema.Type.BYTES)
// `JacksonUtils.toJson` expects `Array[Byte]` for `BYTES` defaults
override def makeDefault(d: ByteBuffer)(cm: CaseMapper): Array[Byte] = d.array()
override protected def buildSchema(cm: CaseMapper): Schema = SchemaBuilder.builder().bytesType()
override def makeDefault(d: ByteBuffer)(cm: CaseMapper): ByteBuffer = d
// copy to avoid issue in case original buffer is reused
override def from(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = {
val ptr = v.asReadOnlyBuffer()
Expand All @@ -213,11 +218,12 @@ object AvroField {

implicit val afCharSequence: AvroField[CharSequence] = id[CharSequence](Schema.Type.STRING)
implicit val afString: AvroField[String] = new Aux[String, String, String] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val schema = Schema.create(Schema.Type.STRING)
GenericData.setStringType(schema, GenericData.StringType.String)
schema
}
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder
.builder()
.stringBuilder()
.prop(GenericData.STRING_PROP, GenericData.StringType.String)
.endString()
override def from(v: String)(cm: CaseMapper): String = v
override def to(v: String)(cm: CaseMapper): String = v
}
Expand All @@ -226,24 +232,25 @@ object AvroField {
// Avro 1.9+ added a type parameter for `GenericEnumSymbol`, breaking 1.8 compatibility
// Some reader, i.e. `AvroParquetReader` reads enums as `Utf8`
new Aux[T, AnyRef, EnumSymbol] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val doc = getDoc(et.annotations, s"Enum ${et.namespace}.${et.name}")
Schema.createEnum(et.name, doc, et.namespace, et.values.asJava)
}
// `JacksonUtils.toJson` expects `String` for `ENUM` defaults
override def makeDefault(d: T)(cm: CaseMapper): String = et.to(d)
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder
.enumeration(et.name)
.doc(getDoc(et.annotations, s"Enum ${et.namespace}.${et.name}"))
.namespace(et.namespace)
.symbols(et.values: _*)
override def makeDefault(d: T)(cm: CaseMapper): EnumSymbol = new GenericData.EnumSymbol(schema(cm), et.to(d))
override def from(v: FromT)(cm: CaseMapper): T = et.from(v.toString)
override def to(v: T)(cm: CaseMapper): ToT = new GenericData.EnumSymbol(schema(cm), v)
}

implicit def afOption[T](implicit f: AvroField[T]): AvroField[Option[T]] =
new Aux[Option[T], f.FromT, f.ToT] {
override protected def buildSchema(cm: CaseMapper): Schema =
Schema.createUnion(Schema.create(Schema.Type.NULL), f.schema(cm))
SchemaBuilder.unionOf().nullType.and().`type`(f.schema(cm)).endUnion()
// `Option[T]` is a `UNION` of `[NULL, T]` and must default to first type `NULL`
override def makeDefault(d: Option[T])(cm: CaseMapper): JsonProperties.Null = {
override def makeDefault(d: Option[T])(cm: CaseMapper): Null = {
require(d.isEmpty, "Option[T] can only default to None")
JsonProperties.NULL_VALUE
null
}
override def from(v: f.FromT)(cm: CaseMapper): Option[T] =
if (v == null) None else Some(f.from(v)(cm))
Expand All @@ -259,7 +266,8 @@ object AvroField {
fc: FactoryCompat[T, C[T]]
): AvroField[C[T]] =
new Aux[C[T], ju.List[f.FromT], GenericArray[f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.createArray(f.schema(cm))
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder.array().items(f.schema(cm))
override def fallbackDefault: ju.List[f.ToT] = ju.Collections.emptyList()
override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] =
fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm)))
Expand All @@ -269,7 +277,8 @@ object AvroField {

implicit def afCharSequenceMap[T](implicit f: AvroField[T]): AvroField[Map[CharSequence, T]] =
new Aux[Map[CharSequence, T], ju.Map[CharSequence, f.FromT], ju.Map[CharSequence, f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.createMap(f.schema(cm))
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder.map().values(f.schema(cm))
override def fallbackDefault: ju.Map[CharSequence, f.ToT] = ju.Collections.emptyMap()
override def from(v: ju.Map[CharSequence, f.FromT])(cm: CaseMapper): Map[CharSequence, T] =
v.asScala.map { case (k, v) => k -> f.from(v)(cm) }.toMap
Expand All @@ -279,11 +288,11 @@ object AvroField {

implicit def afStringMap[T](implicit f: AvroField[T]): AvroField[Map[String, T]] =
new Aux[Map[String, T], ju.Map[String, f.FromT], ju.Map[String, f.ToT]] {
override protected def buildSchema(cm: CaseMapper): Schema = {
val schema = Schema.createMap(f.schema(cm))
GenericData.setStringType(schema, GenericData.StringType.String)
schema
}
override protected def buildSchema(cm: CaseMapper): Schema =
SchemaBuilder
.map()
.prop(GenericData.STRING_PROP, GenericData.StringType.String)
.values(f.schema(cm))
override def fallbackDefault: ju.Map[String, f.ToT] = ju.Collections.emptyMap()
override def from(v: ju.Map[String, f.FromT])(cm: CaseMapper): Map[String, T] =
v.asScala.map { case (k, v) => k -> f.from(v)(cm) }.toMap
Expand Down Expand Up @@ -361,7 +370,11 @@ object AvroField {
override protected def buildSchema(cm: CaseMapper): Schema = {
val n = ReflectionUtils.name[T]
val ns = ReflectionUtils.namespace[T]
Schema.createFixed(n, getDoc(an.annotations, n), ns, size)
SchemaBuilder
.fixed(n)
.namespace(ns)
.doc(getDoc(an.annotations, n))
.size(size)
}

override def from(v: GenericFixed)(cm: CaseMapper): T = f(v.bytes())
Expand Down

0 comments on commit bb89868

Please sign in to comment.