From 803e64ee1f906d575e16a8f744073c9b44482be9 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Wed, 21 Aug 2024 23:39:41 -0400 Subject: [PATCH] wip --- .../scala/magnolify/beam/BeamSchemaType.scala | 100 +++--------------- .../magnolify/beam/BeamSchemaTypeSuite.scala | 81 ++++++++------ 2 files changed, 64 insertions(+), 117 deletions(-) diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala index 1c9c4033..205380b0 100644 --- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala +++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala @@ -19,7 +19,7 @@ package magnolify.beam import magnolia1.* import magnolify.shared.* import org.apache.beam.sdk.schemas.Schema -import org.apache.beam.sdk.schemas.Schema.{FieldType, LogicalType} +import org.apache.beam.sdk.schemas.Schema.FieldType import org.apache.beam.sdk.values.Row import org.joda.time as joda import com.google.protobuf.ByteString @@ -27,7 +27,6 @@ import magnolify.shims.FactoryCompat import org.apache.beam.sdk.schemas.logicaltypes as logicaltypes import java.nio.ByteBuffer -import java.time.LocalDate import java.{time as jt, util as ju} import scala.annotation.implicitNotFound import scala.collection.concurrent @@ -87,25 +86,9 @@ object BeamSchemaField { override def to(v: T)(cm: CaseMapper): ToT = toFn(v) } -// private[magnolify] def aux2[T, Repr](fieldTypeFn: CaseMapper => FieldType)(fromFn: Repr => T)( -// toFn: T => Repr -// ): BeamSchemaField[T] = -// aux[T, Repr, Repr](fieldTypeFn)(fromFn)(toFn) - private[magnolify] def id[T](ft: CaseMapper => FieldType): BeamSchemaField[T] = aux[T, T, T](ft)(identity)(identity) - private[magnolify] def logicalId[T](ft: CaseMapper => FieldType): BeamSchemaField[T] = id(ft) - -// private[magnolify] def logicalId[T](ft: CaseMapper => FieldType): BeamSchemaLogicalField[T] = -// new BeamSchemaLogicalField[T] { -// type FromT = T -// type ToT = T -// override def fieldType(cm: CaseMapper): FieldType = ft(cm) -// override def from(v: FromT)(cm: CaseMapper): T = v -// override def to(v: T)(cm: CaseMapper): ToT = v -// } - def from[T]: FromWord[T] = new FromWord[T] class FromWord[T] { @@ -117,24 +100,6 @@ object BeamSchemaField { } } -// def logicalType[T]: LogicalWord[T] = new LogicalWord[T] -// -// class LogicalWord[T] { -// def apply[From, To](lt: LogicalType[From, ?], nullable: Boolean = false): BeamSchemaLogicalField[T] = { -// new BeamSchemaLogicalField[T] { -// type FromT = From -// type ToT = To -// override def logicalType: LogicalType[From, To] = lt -// override def fieldType(cm: CaseMapper): FieldType = -// FieldType.logicalType(logicalType).withNullable(nullable) -// override def from(v: From)(cm: CaseMapper): T = -// throw new UnsupportedOperationException("Do not call from() on logical types") -// override def to(v: T)(cm: CaseMapper): To = -// throw new UnsupportedOperationException("Do not call to() on logical types") -// } -// } -// } - sealed trait Record[T] extends Aux[T, Row, Row] { @transient private lazy val schemaCache: concurrent.Map[ju.UUID, Schema] = concurrent.TrieMap.empty @@ -153,17 +118,15 @@ object BeamSchemaField { def join[T](caseClass: CaseClass[Typeclass, T]): BeamSchemaField[T] = { if (caseClass.isValueClass) { - // FIXME -// val p = caseClass.parameters.head -// val tc = p.typeclass -// new BeamSchemaField[T] { -// override type FromT = tc.FromT -// override type ToT = tc.ToT -// // override protected def buildSchema(cm: CaseMapper): Schema = tc.buildSchema(cm) -// override def from(v: FromT)(cm: CaseMapper): T = caseClass.construct(_ => tc.fromAny(v)(cm)) -// override def to(v: T)(cm: CaseMapper): ToT = tc.to(p.dereference(v))(cm) -// } - ??? + val p = caseClass.parameters.head + val tc = p.typeclass + new BeamSchemaField[T] { + override type FromT = tc.FromT + override type ToT = tc.ToT + override def fieldType(cm: CaseMapper): FieldType = tc.fieldType(cm) + override def from(v: FromT)(cm: CaseMapper): T = caseClass.construct(_ => tc.fromAny(v)(cm)) + override def to(v: T)(cm: CaseMapper): ToT = tc.to(p.dereference(v))(cm) + } } else { new Record[T] { override def fieldType(cm: CaseMapper): FieldType = FieldType.row(schema(cm)) @@ -171,7 +134,7 @@ object BeamSchemaField { override protected def buildSchema(cm: CaseMapper): Schema = caseClass.parameters .foldLeft(Schema.builder()) { case (s, p) => - s.addField(p.label, p.typeclass.fieldType(cm)) + s.addField(cm.map(p.label), p.typeclass.fieldType(cm)) } .build() @@ -229,20 +192,10 @@ object BeamSchemaField { )(_.bigDecimal) implicit val bsfUUID: BeamSchemaField[ju.UUID] = - logicalId[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType)) -// new BeamSchemaLogicalField[ju.UUID] { -// type FromT = ju.UUID -// type ToT = ju.UUID -// val logicalType = new logicaltypes.UuidLogicalType -// override def fieldType(cm: CaseMapper): FieldType = FieldType.logicalType(logicalType) -// override def from(v: ju.UUID)(cm: CaseMapper): ju.UUID = v -// override def to(v: ju.UUID)(cm: CaseMapper): ju.UUID = v -// } + id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType)) -// implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] = -// from[Long](LocalDate.ofEpochDay)(_.toEpochDay) implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] = - logicalId[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) + id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] = from[Int](daysFromEpoch => EpochJodaDate.plusDays(daysFromEpoch))(d => @@ -292,18 +245,6 @@ object BeamSchemaField { ti: C[T] => Iterable[T], fc: FactoryCompat[T, C[T]] ): BeamSchemaField[C[T]] = { -// f match { -// case l: BeamSchemaLogicalField[T] => -// new BeamSchemaLogicalField[C[T]] { -// type FromT = ju.List[l.FromT] -// type ToT = ju.List[l.ToT] -// override def fieldType(cm: CaseMapper): FieldType = FieldType.iterable(l.fieldType(cm)) -// override def to(v: C[T])(cm: CaseMapper): ToT = -// v.iterator.map(l.to(_)(cm)).toList.asJava -// override def from(v: FromT)(cm: CaseMapper): C[T] = -// fc.fromSpecific(v.asScala.iterator.map(p => l.from(p)(cm))) -// } -// case _: BeamSchemaField[_] => new Aux[C[T], ju.List[f.FromT], ju.List[f.ToT]] { override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] = fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm))) @@ -311,23 +252,9 @@ object BeamSchemaField { v.iterator.map(f.to(_)(cm)).toList.asJava override def fieldType(cm: CaseMapper): FieldType = FieldType.iterable(f.fieldType(cm)) } -// } } implicit def bsfOption[T](implicit f: BeamSchemaField[T]): BeamSchemaField[Option[T]] = { -// f match { -// case l: BeamSchemaLogicalField[T] => -// new BeamSchemaLogicalField[Option[T]] { -// type FromT = l.FromT -// type ToT = l.ToT -// override def fieldType(cm: CaseMapper): FieldType = l.fieldType(cm).withNullable(true) -// override def to(v: Option[T])(cm: CaseMapper): ToT = v match { -// case None => null.asInstanceOf[ToT] -// case Some(value) => l.to(value)(cm) -// } -// override def from(v: FromT)(cm: CaseMapper): Option[T] = Option(v).map(l.from(_)(cm)) -// } -// case _: BeamSchemaField[T] => new Aux[Option[T], f.FromT, f.ToT] { override def from(v: f.FromT)(cm: CaseMapper): Option[T] = if (v == null) None else Some(f.from(v)(cm)) @@ -337,6 +264,5 @@ object BeamSchemaField { } override def fieldType(cm: CaseMapper): FieldType = f.fieldType(cm).withNullable(true) } -// } } } diff --git a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala index c15180c0..3b881be1 100644 --- a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala +++ b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala @@ -21,14 +21,17 @@ import magnolify.cats.auto.* import magnolify.cats.TestEq.* import magnolify.scalacheck.auto.* import magnolify.scalacheck.TestArbitrary.* +import magnolify.shared.CaseMapper import magnolify.test.MagnolifySuite import magnolify.test.Simple.* +import org.apache.beam.sdk.schemas.Schema import org.joda.time as joda import org.scalacheck.{Arbitrary, Gen, Prop} import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} import java.util.UUID import scala.reflect.ClassTag +import scala.jdk.CollectionConverters.* class BeamSchemaTypeSuite extends MagnolifySuite { private def test[T: Arbitrary: ClassTag](implicit @@ -60,6 +63,8 @@ class BeamSchemaTypeSuite extends MagnolifySuite { test[Maps] test[Logical] + // FIXME value classes + { import magnolify.beam.unsafe._ import magnolify.shared.TestEnumType._ @@ -71,37 +76,54 @@ class BeamSchemaTypeSuite extends MagnolifySuite { Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_))) test[Decimal] + test("Millis") { + import magnolify.beam.logical.millis.* + test[JavaTime] + test[JodaTime] + } + + test("Micros") { + import magnolify.beam.logical.micros.* + test[JavaTime] + test[JodaTime] + } + + test("Nanos") { + import magnolify.beam.logical.nanos.* + test[JavaTime] + test[JodaTime] + } + { - import magnolify.beam.logical.millis._ - test[Time] - test[Joda] + implicit val bst: BeamSchemaType[LowerCamel] = + BeamSchemaType[LowerCamel](CaseMapper(_.toUpperCase)) + test[LowerCamel] + + test("LowerCamel mapping") { + val schema = bst.schema + + val fields = LowerCamel.fields.map(_.toUpperCase) + assertEquals(schema.getFields.asScala.map(_.getName()).toSeq, fields) + assertEquals( + schema.getField("INNERFIELD").getType.getRowSchema.getFields.asScala.map(_.getName()).toSeq, + Seq("INNERFIRST") + ) + } } -// { -// // FIXME need special Eq instances that are lossy -// import magnolify.beam.logical.micros._ -// test[Time] -// test[Joda] -// } -// -// { -//// FIXME need special Eq instances that are lossy -// import magnolify.beam.logical.nanos._ -// test[Time] -// test[Joda] -// } - -// { -// implicit val bst: BeamSchemaType[LowerCamel] = -// BeamSchemaType[LowerCamel](CaseMapper(_.toUpperCase)) -// test[LowerCamel] -// -// test("LowerCamel mapping") { -// val schema = bst.schema -// // FIXME -// } -// } + test("ValueClass") { + // value classes should act only as fields + intercept[IllegalArgumentException] { + BeamSchemaType[ValueClass] + } + implicit val bst: BeamSchemaType[HasValueClass] = BeamSchemaType[HasValueClass] + test[HasValueClass] + + assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING) + val record = bst(HasValueClass(ValueClass("String"))) + assert(record.getValue("vc").equals("String")) + } } case class Decimal(bd: BigDecimal, bdo: Option[BigDecimal]) @@ -111,14 +133,13 @@ case class Logical( ul: List[UUID], ulo: List[Option[UUID]] ) - -case class Time( +case class JavaTime( i: Instant, d: LocalDate, dt: LocalDateTime, t: LocalTime ) -case class Joda( +case class JodaTime( i: joda.Instant, dt: joda.DateTime, lt: joda.LocalTime,