Skip to content

Commit

Permalink
rn
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Sep 5, 2024
1 parent 9d7f1f1 commit dd11adb
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 124 deletions.
96 changes: 46 additions & 50 deletions beam/src/main/scala/magnolify/beam/RowType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ sealed trait RowType[T] extends Converter[T, Row, Row] {
}

object RowType {
implicit def apply[T: BeamSchemaField]: RowType[T] =
RowType[T](CaseMapper.identity)
implicit def apply[T: RowField]: RowType[T] = RowType[T](CaseMapper.identity)

def apply[T](cm: CaseMapper)(implicit f: BeamSchemaField[T]): RowType[T] = {
def apply[T](cm: CaseMapper)(implicit f: RowField[T]): RowType[T] = {
f match {
case r: BeamSchemaField.Record[_] =>
case r: RowField.Record[_] =>
val mappedSchema = r.schema(cm) // fail fast on bad annotations
new RowType[T] {
private val caseMapper: CaseMapper = cm
Expand All @@ -61,7 +60,7 @@ object RowType {
}
}

sealed trait BeamSchemaField[T] extends Serializable {
sealed trait RowField[T] extends Serializable {
type FromT
type ToT
def fieldType(cm: CaseMapper): FieldType
Expand All @@ -70,32 +69,32 @@ sealed trait BeamSchemaField[T] extends Serializable {
def fromAny(v: Any)(cm: CaseMapper): T = from(v.asInstanceOf[FromT])(cm)
}

object BeamSchemaField {
sealed trait Aux[T, From, To] extends BeamSchemaField[T] {
object RowField {
sealed trait Aux[T, From, To] extends RowField[T] {
override type FromT = From
override type ToT = To
}

private[magnolify] def aux[T, From, To](
ft: CaseMapper => FieldType
)(fromFn: From => T)(toFn: T => To): BeamSchemaField[T] =
)(fromFn: From => T)(toFn: T => To): RowField[T] =
new Aux[T, From, To] {
override def fieldType(cm: CaseMapper): FieldType = ft(cm)
override def from(v: FromT)(cm: CaseMapper): T = fromFn(v)
override def to(v: T)(cm: CaseMapper): ToT = toFn(v)
}

private[magnolify] def id[T](ft: CaseMapper => FieldType): BeamSchemaField[T] =
private[magnolify] def id[T](ft: CaseMapper => FieldType): RowField[T] =
aux[T, T, T](ft)(identity)(identity)

def from[T]: FromWord[T] = new FromWord[T]

class FromWord[T] {
def apply[U](f: T => U)(g: U => T)(implicit bsf: BeamSchemaField[T]): BeamSchemaField[U] =
new Aux[U, bsf.FromT, bsf.ToT] {
override def fieldType(cm: CaseMapper): FieldType = bsf.fieldType(cm)
override def from(v: FromT)(cm: CaseMapper): U = f(bsf.from(v)(cm))
override def to(v: U)(cm: CaseMapper): ToT = bsf.to(g(v))(cm)
def apply[U](f: T => U)(g: U => T)(implicit rf: RowField[T]): RowField[U] =
new Aux[U, rf.FromT, rf.ToT] {
override def fieldType(cm: CaseMapper): FieldType = rf.fieldType(cm)
override def from(v: FromT)(cm: CaseMapper): U = f(rf.from(v)(cm))
override def to(v: U)(cm: CaseMapper): ToT = rf.to(g(v))(cm)
}
}

Expand All @@ -108,13 +107,13 @@ object BeamSchemaField {

// ////////////////////////////////////////////////

type Typeclass[T] = BeamSchemaField[T]
implicit def gen[T]: BeamSchemaField[T] = macro Magnolia.gen[T]
type Typeclass[T] = RowField[T]
implicit def gen[T]: RowField[T] = macro Magnolia.gen[T]

def split[T](
sealedTrait: SealedTrait[Typeclass, T]
)(implicit r: shapeless.Refute[EnumType[T]]): BeamSchemaField[T] =
new BeamSchemaField[T] {
)(implicit r: shapeless.Refute[EnumType[T]]): RowField[T] =
new RowField[T] {
override type FromT = logicaltypes.OneOfType.Value
override type ToT = logicaltypes.OneOfType.Value

Expand Down Expand Up @@ -149,11 +148,11 @@ object BeamSchemaField {
)
}

def join[T](caseClass: CaseClass[Typeclass, T]): BeamSchemaField[T] = {
def join[T](caseClass: CaseClass[Typeclass, T]): RowField[T] = {
if (caseClass.isValueClass) {
val p = caseClass.parameters.head
val tc = p.typeclass
new BeamSchemaField[T] {
new RowField[T] {
override type FromT = tc.FromT
override type ToT = tc.ToT
override def fieldType(cm: CaseMapper): FieldType = tc.fieldType(cm)
Expand Down Expand Up @@ -185,39 +184,39 @@ object BeamSchemaField {
}

// BYTE An 8-bit signed value
implicit val bsfByte: BeamSchemaField[Byte] = id[Byte](_ => FieldType.BYTE)
implicit val rfByte: RowField[Byte] = id[Byte](_ => FieldType.BYTE)
// INT16 A 16-bit signed value
implicit val bsfShort: BeamSchemaField[Short] = id[Short](_ => FieldType.INT16)
implicit val bsfChar: BeamSchemaField[Char] = from[Short](_.toChar)(_.toShort)
implicit val rfShort: RowField[Short] = id[Short](_ => FieldType.INT16)
implicit val rfChar: RowField[Char] = from[Short](_.toChar)(_.toShort)
// INT32 A 32-bit signed value
implicit val bsfInt: BeamSchemaField[Int] = id[Int](_ => FieldType.INT32)
implicit val rfInt: RowField[Int] = id[Int](_ => FieldType.INT32)
// INT64 A 64-bit signed value
implicit val bsfLong: BeamSchemaField[Long] = id[Long](_ => FieldType.INT64)
implicit val rfLong: RowField[Long] = id[Long](_ => FieldType.INT64)
// FLOAT A 32-bit IEEE 754 floating point number
implicit val bsfFloat: BeamSchemaField[Float] = id[Float](_ => FieldType.FLOAT)
implicit val rfFloat: RowField[Float] = id[Float](_ => FieldType.FLOAT)
// DOUBLE A 64-bit IEEE 754 floating point number
implicit val bsfDouble: BeamSchemaField[Double] = id[Double](_ => FieldType.DOUBLE)
implicit val rfDouble: RowField[Double] = id[Double](_ => FieldType.DOUBLE)
// STRING A string
implicit val bsfString: BeamSchemaField[String] = id[String](_ => FieldType.STRING)
implicit val rfString: RowField[String] = id[String](_ => FieldType.STRING)
// BOOLEAN A boolean value
implicit val bsfBoolean: BeamSchemaField[Boolean] = id[Boolean](_ => FieldType.BOOLEAN)
implicit val rfBoolean: RowField[Boolean] = id[Boolean](_ => FieldType.BOOLEAN)
// BYTES A raw byte array
implicit val bsfByteArray: BeamSchemaField[Array[Byte]] = id[Array[Byte]](_ => FieldType.BYTES)
implicit val bsfByteBuffer: BeamSchemaField[ByteBuffer] =
implicit val rfByteArray: RowField[Array[Byte]] = id[Array[Byte]](_ => FieldType.BYTES)
implicit val rfByteBuffer: RowField[ByteBuffer] =
from[Array[Byte]](x => ByteBuffer.wrap(x))(_.array())
implicit val bsfByteString: BeamSchemaField[ByteString] =
implicit val rfByteString: RowField[ByteString] =
from[Array[Byte]](x => ByteString.copyFrom(x))(_.toByteArray)
// DECIMAL An arbitrary-precision decimal type
implicit val bsfDecimal: BeamSchemaField[BigDecimal] =
implicit val rfDecimal: RowField[BigDecimal] =
aux[BigDecimal, java.math.BigDecimal, java.math.BigDecimal](_ => FieldType.DECIMAL)(
BigDecimal.apply
)(_.bigDecimal)

implicit val bsfUUID: BeamSchemaField[ju.UUID] =
implicit val rfUUID: RowField[ju.UUID] =
id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType))

implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] =
new BeamSchemaField[T] {
implicit def rfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): RowField[T] =
new RowField[T] {
type FromT = logicaltypes.EnumerationType.Value
type ToT = logicaltypes.EnumerationType.Value

Expand All @@ -240,24 +239,21 @@ object BeamSchemaField {
enumType(cm).from(beamEnumType(cm).toString(v))
}

implicit def bsfMap[K, V](implicit
bsfK: BeamSchemaField[K],
bsfV: BeamSchemaField[V]
): BeamSchemaField[Map[K, V]] =
new Aux[Map[K, V], ju.Map[bsfK.FromT, bsfV.FromT], ju.Map[bsfK.ToT, bsfV.ToT]] {
implicit def rfMap[K, V](implicit rfK: RowField[K], rfV: RowField[V]): RowField[Map[K, V]] =
new Aux[Map[K, V], ju.Map[rfK.FromT, rfV.FromT], ju.Map[rfK.ToT, rfV.ToT]] {
override def fieldType(cm: CaseMapper): FieldType =
FieldType.map(bsfK.fieldType(cm), bsfV.fieldType(cm))
override def from(v: ju.Map[bsfK.FromT, bsfV.FromT])(cm: CaseMapper): Map[K, V] =
v.asScala.map { case (k, v) => bsfK.from(k)(cm) -> bsfV.from(v)(cm) }.toMap
override def to(v: Map[K, V])(cm: CaseMapper): ju.Map[bsfK.ToT, bsfV.ToT] =
v.map { case (k, v) => bsfK.to(k)(cm) -> bsfV.to(v)(cm) }.asJava
FieldType.map(rfK.fieldType(cm), rfV.fieldType(cm))
override def from(v: ju.Map[rfK.FromT, rfV.FromT])(cm: CaseMapper): Map[K, V] =
v.asScala.map { case (k, v) => rfK.from(k)(cm) -> rfV.from(v)(cm) }.toMap
override def to(v: Map[K, V])(cm: CaseMapper): ju.Map[rfK.ToT, rfV.ToT] =
v.map { case (k, v) => rfK.to(k)(cm) -> rfV.to(v)(cm) }.asJava
}

implicit def bsfIterable[T, C[_]](implicit
f: BeamSchemaField[T],
implicit def rfIterable[T, C[_]](implicit
f: RowField[T],
ti: C[T] => Iterable[T],
fc: FactoryCompat[T, C[T]]
): BeamSchemaField[C[T]] = {
): RowField[C[T]] = {
new Aux[C[T], ju.List[f.FromT], ju.List[f.ToT]] {
override def from(v: ju.List[f.FromT])(cm: CaseMapper): C[T] =
fc.fromSpecific(v.asScala.iterator.map(p => f.from(p)(cm)))
Expand All @@ -267,7 +263,7 @@ object BeamSchemaField {
}
}

implicit def bsfOption[T](implicit f: BeamSchemaField[T]): BeamSchemaField[Option[T]] = {
implicit def rfOption[T](implicit f: RowField[T]): RowField[Option[T]] = {
new Aux[Option[T], f.FromT, f.ToT] {
override def from(v: f.FromT)(cm: CaseMapper): Option[T] =
if (v == null) None else Some(f.from(v)(cm))
Expand Down
Loading

0 comments on commit dd11adb

Please sign in to comment.