Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 22, 2024
1 parent 1a5ecbb commit 803e64e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 117 deletions.
100 changes: 13 additions & 87 deletions beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package magnolify.beam
import magnolia1.*
import magnolify.shared.*
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.schemas.Schema.{FieldType, LogicalType}
import org.apache.beam.sdk.schemas.Schema.FieldType
import org.apache.beam.sdk.values.Row
import org.joda.time as joda
import com.google.protobuf.ByteString
import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes as logicaltypes

import java.nio.ByteBuffer
import java.time.LocalDate
import java.{time as jt, util as ju}
import scala.annotation.implicitNotFound
import scala.collection.concurrent
Expand Down Expand Up @@ -87,25 +86,9 @@ object BeamSchemaField {
override def to(v: T)(cm: CaseMapper): ToT = toFn(v)
}

// private[magnolify] def aux2[T, Repr](fieldTypeFn: CaseMapper => FieldType)(fromFn: Repr => T)(
// toFn: T => Repr
// ): BeamSchemaField[T] =
// aux[T, Repr, Repr](fieldTypeFn)(fromFn)(toFn)

private[magnolify] def id[T](ft: CaseMapper => FieldType): BeamSchemaField[T] =
aux[T, T, T](ft)(identity)(identity)

private[magnolify] def logicalId[T](ft: CaseMapper => FieldType): BeamSchemaField[T] = id(ft)

// private[magnolify] def logicalId[T](ft: CaseMapper => FieldType): BeamSchemaLogicalField[T] =
// new BeamSchemaLogicalField[T] {
// type FromT = T
// type ToT = T
// override def fieldType(cm: CaseMapper): FieldType = ft(cm)
// override def from(v: FromT)(cm: CaseMapper): T = v
// override def to(v: T)(cm: CaseMapper): ToT = v
// }

def from[T]: FromWord[T] = new FromWord[T]

class FromWord[T] {
Expand All @@ -117,24 +100,6 @@ object BeamSchemaField {
}
}

// def logicalType[T]: LogicalWord[T] = new LogicalWord[T]
//
// class LogicalWord[T] {
// def apply[From, To](lt: LogicalType[From, ?], nullable: Boolean = false): BeamSchemaLogicalField[T] = {
// new BeamSchemaLogicalField[T] {
// type FromT = From
// type ToT = To
// override def logicalType: LogicalType[From, To] = lt
// override def fieldType(cm: CaseMapper): FieldType =
// FieldType.logicalType(logicalType).withNullable(nullable)
// override def from(v: From)(cm: CaseMapper): T =
// throw new UnsupportedOperationException("Do not call from() on logical types")
// override def to(v: T)(cm: CaseMapper): To =
// throw new UnsupportedOperationException("Do not call to() on logical types")
// }
// }
// }

sealed trait Record[T] extends Aux[T, Row, Row] {
@transient private lazy val schemaCache: concurrent.Map[ju.UUID, Schema] =
concurrent.TrieMap.empty
Expand All @@ -153,25 +118,23 @@ object BeamSchemaField {

def join[T](caseClass: CaseClass[Typeclass, T]): BeamSchemaField[T] = {
if (caseClass.isValueClass) {
// FIXME
// val p = caseClass.parameters.head
// val tc = p.typeclass
// new BeamSchemaField[T] {
// override type FromT = tc.FromT
// override type ToT = tc.ToT
// // override protected def buildSchema(cm: CaseMapper): Schema = tc.buildSchema(cm)
// override def from(v: FromT)(cm: CaseMapper): T = caseClass.construct(_ => tc.fromAny(v)(cm))
// override def to(v: T)(cm: CaseMapper): ToT = tc.to(p.dereference(v))(cm)
// }
???
val p = caseClass.parameters.head
val tc = p.typeclass
new BeamSchemaField[T] {
override type FromT = tc.FromT
override type ToT = tc.ToT
override def fieldType(cm: CaseMapper): FieldType = tc.fieldType(cm)
override def from(v: FromT)(cm: CaseMapper): T = caseClass.construct(_ => tc.fromAny(v)(cm))
override def to(v: T)(cm: CaseMapper): ToT = tc.to(p.dereference(v))(cm)
}
} else {
new Record[T] {
override def fieldType(cm: CaseMapper): FieldType = FieldType.row(schema(cm))

override protected def buildSchema(cm: CaseMapper): Schema =
caseClass.parameters
.foldLeft(Schema.builder()) { case (s, p) =>
s.addField(p.label, p.typeclass.fieldType(cm))
s.addField(cm.map(p.label), p.typeclass.fieldType(cm))
}
.build()

Expand Down Expand Up @@ -229,20 +192,10 @@ object BeamSchemaField {
)(_.bigDecimal)

implicit val bsfUUID: BeamSchemaField[ju.UUID] =
logicalId[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType))
// new BeamSchemaLogicalField[ju.UUID] {
// type FromT = ju.UUID
// type ToT = ju.UUID
// val logicalType = new logicaltypes.UuidLogicalType
// override def fieldType(cm: CaseMapper): FieldType = FieldType.logicalType(logicalType)
// override def from(v: ju.UUID)(cm: CaseMapper): ju.UUID = v
// override def to(v: ju.UUID)(cm: CaseMapper): ju.UUID = v
// }
id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType))

// implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
// from[Long](LocalDate.ofEpochDay)(_.toEpochDay)
implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
logicalId[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
from[Int](daysFromEpoch => EpochJodaDate.plusDays(daysFromEpoch))(d =>
Expand Down Expand Up @@ -292,42 +245,16 @@ object BeamSchemaField {
ti: C[T] => Iterable[T],
fc: FactoryCompat[T, C[T]]
): BeamSchemaField[C[T]] = {
// f match {
// case l: BeamSchemaLogicalField[T] =>
// new BeamSchemaLogicalField[C[T]] {
// type FromT = ju.List[l.FromT]
// type ToT = ju.List[l.ToT]
// override def fieldType(cm: CaseMapper): FieldType = FieldType.iterable(l.fieldType(cm))
// override def to(v: C[T])(cm: CaseMapper): ToT =
// v.iterator.map(l.to(_)(cm)).toList.asJava
// override def from(v: FromT)(cm: CaseMapper): C[T] =
// fc.fromSpecific(v.asScala.iterator.map(p => l.from(p)(cm)))
// }
// case _: BeamSchemaField[_] =>
new Aux[C[T], ju.List[f.FromT], ju.List[f.ToT]] {
override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] =
fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm)))
override def to(v: C[T])(cm: CaseMapper): ju.List[f.ToT] =
v.iterator.map(f.to(_)(cm)).toList.asJava
override def fieldType(cm: CaseMapper): FieldType = FieldType.iterable(f.fieldType(cm))
}
// }
}

implicit def bsfOption[T](implicit f: BeamSchemaField[T]): BeamSchemaField[Option[T]] = {
// f match {
// case l: BeamSchemaLogicalField[T] =>
// new BeamSchemaLogicalField[Option[T]] {
// type FromT = l.FromT
// type ToT = l.ToT
// override def fieldType(cm: CaseMapper): FieldType = l.fieldType(cm).withNullable(true)
// override def to(v: Option[T])(cm: CaseMapper): ToT = v match {
// case None => null.asInstanceOf[ToT]
// case Some(value) => l.to(value)(cm)
// }
// override def from(v: FromT)(cm: CaseMapper): Option[T] = Option(v).map(l.from(_)(cm))
// }
// case _: BeamSchemaField[T] =>
new Aux[Option[T], f.FromT, f.ToT] {
override def from(v: f.FromT)(cm: CaseMapper): Option[T] =
if (v == null) None else Some(f.from(v)(cm))
Expand All @@ -337,6 +264,5 @@ object BeamSchemaField {
}
override def fieldType(cm: CaseMapper): FieldType = f.fieldType(cm).withNullable(true)
}
// }
}
}
81 changes: 51 additions & 30 deletions beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import magnolify.cats.auto.*
import magnolify.cats.TestEq.*
import magnolify.scalacheck.auto.*
import magnolify.scalacheck.TestArbitrary.*
import magnolify.shared.CaseMapper
import magnolify.test.MagnolifySuite
import magnolify.test.Simple.*
import org.apache.beam.sdk.schemas.Schema
import org.joda.time as joda
import org.scalacheck.{Arbitrary, Gen, Prop}

import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.util.UUID
import scala.reflect.ClassTag
import scala.jdk.CollectionConverters.*

class BeamSchemaTypeSuite extends MagnolifySuite {
private def test[T: Arbitrary: ClassTag](implicit
Expand Down Expand Up @@ -60,6 +63,8 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
test[Maps]
test[Logical]

// FIXME value classes

{
import magnolify.beam.unsafe._
import magnolify.shared.TestEnumType._
Expand All @@ -71,37 +76,54 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_)))
test[Decimal]

test("Millis") {
import magnolify.beam.logical.millis.*
test[JavaTime]
test[JodaTime]
}

test("Micros") {
import magnolify.beam.logical.micros.*
test[JavaTime]
test[JodaTime]
}

test("Nanos") {
import magnolify.beam.logical.nanos.*
test[JavaTime]
test[JodaTime]
}

{
import magnolify.beam.logical.millis._
test[Time]
test[Joda]
implicit val bst: BeamSchemaType[LowerCamel] =
BeamSchemaType[LowerCamel](CaseMapper(_.toUpperCase))
test[LowerCamel]

test("LowerCamel mapping") {
val schema = bst.schema

val fields = LowerCamel.fields.map(_.toUpperCase)
assertEquals(schema.getFields.asScala.map(_.getName()).toSeq, fields)
assertEquals(
schema.getField("INNERFIELD").getType.getRowSchema.getFields.asScala.map(_.getName()).toSeq,
Seq("INNERFIRST")
)
}
}

// {
// // FIXME need special Eq instances that are lossy
// import magnolify.beam.logical.micros._
// test[Time]
// test[Joda]
// }
//
// {
//// FIXME need special Eq instances that are lossy
// import magnolify.beam.logical.nanos._
// test[Time]
// test[Joda]
// }

// {
// implicit val bst: BeamSchemaType[LowerCamel] =
// BeamSchemaType[LowerCamel](CaseMapper(_.toUpperCase))
// test[LowerCamel]
//
// test("LowerCamel mapping") {
// val schema = bst.schema
// // FIXME
// }
// }
test("ValueClass") {
// value classes should act only as fields
intercept[IllegalArgumentException] {
BeamSchemaType[ValueClass]
}

implicit val bst: BeamSchemaType[HasValueClass] = BeamSchemaType[HasValueClass]
test[HasValueClass]

assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING)
val record = bst(HasValueClass(ValueClass("String")))
assert(record.getValue("vc").equals("String"))
}
}

case class Decimal(bd: BigDecimal, bdo: Option[BigDecimal])
Expand All @@ -111,14 +133,13 @@ case class Logical(
ul: List[UUID],
ulo: List[Option[UUID]]
)

case class Time(
case class JavaTime(
i: Instant,
d: LocalDate,
dt: LocalDateTime,
t: LocalTime
)
case class Joda(
case class JodaTime(
i: joda.Instant,
dt: joda.DateTime,
lt: joda.LocalTime,
Expand Down

0 comments on commit 803e64e

Please sign in to comment.