From a3d90bfd5ceb47979b0e11c1536a5b5e68631c30 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Thu, 22 Aug 2024 08:34:38 -0400 Subject: [PATCH] wip --- .../main/scala/magnolify/avro/AvroType.scala | 2 +- .../magnolify/avro/logical/package.scala | 1 - .../scala/magnolify/beam/BeamSchemaType.scala | 22 +++++++++++-- .../magnolify/beam/BeamSchemaTypeSuite.scala | 33 ++++++++++++------- .../magnolify/parquet/logical/TimeTypes.scala | 28 ++++++++++++++++ .../magnolify/parquet/logical/package.scala | 9 ----- .../magnolify/parquet/ParquetTypeSuite.scala | 6 ++-- .../main/scala/magnolify/shared/Time.scala | 16 ++++----- 8 files changed, 81 insertions(+), 36 deletions(-) create mode 100644 parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala diff --git a/avro/src/main/scala/magnolify/avro/AvroType.scala b/avro/src/main/scala/magnolify/avro/AvroType.scala index d4f853cf7..7ede81ffe 100644 --- a/avro/src/main/scala/magnolify/avro/AvroType.scala +++ b/avro/src/main/scala/magnolify/avro/AvroType.scala @@ -221,7 +221,7 @@ object AvroField { override def from(v: String)(cm: CaseMapper): String = v override def to(v: String)(cm: CaseMapper): String = v } -f + implicit def afEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): AvroField[T] = // Avro 1.9+ added a type parameter for `GenericEnumSymbol`, breaking 1.8 compatibility // Some reader, i.e. `AvroParquetReader` reads enums as `Utf8` diff --git a/avro/src/main/scala/magnolify/avro/logical/package.scala b/avro/src/main/scala/magnolify/avro/logical/package.scala index 35620cc56..bd0d070c1 100644 --- a/avro/src/main/scala/magnolify/avro/logical/package.scala +++ b/avro/src/main/scala/magnolify/avro/logical/package.scala @@ -22,7 +22,6 @@ import org.joda.{time => joda} import java.time._ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder} -import java.util.concurrent.TimeUnit package object logical { import magnolify.shared.Time._ diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala index 205380b00..67b489a4f 100644 --- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala +++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala @@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.Row import org.joda.time as joda import com.google.protobuf.ByteString import magnolify.shims.FactoryCompat -import org.apache.beam.sdk.schemas.logicaltypes as logicaltypes +import org.apache.beam.sdk.schemas.logicaltypes import java.nio.ByteBuffer import java.{time as jt, util as ju} @@ -112,9 +112,26 @@ object BeamSchemaField { type Typeclass[T] = BeamSchemaField[T] implicit def gen[T]: BeamSchemaField[T] = macro Magnolia.gen[T] - @implicitNotFound("Cannot derive BeamSchemaField for sealed trait") // TODO does this make sense? + // TODO beam schemas support OneOf + @implicitNotFound("Cannot derive BeamSchemaField for sealed trait") private sealed trait Dispatchable[T] def split[T: Dispatchable](sealedTrait: SealedTrait[Typeclass, T]): BeamSchemaField[T] = ??? + // new BeamSchemaField[T] { + // override type FromT = ??? + // override type ToT = ??? + // override def fieldType(cm: CaseMapper): FieldType = { + // FieldType.logicalType( + // logicaltypes.OneOfType.create( + // sealedTrait.subtypes.map { sub => + // Field.of(s"${sub.typeName.owner}.${sub.typeName.short}", sub.typeclass.fieldType(cm)) + // } + // .asJava + // ) + // ) + // } + // override def from(v: this.type)(cm: CaseMapper): T = ??? + // override def to(v: T)(cm: CaseMapper): this.type = ??? + // } def join[T](caseClass: CaseClass[Typeclass, T]): BeamSchemaField[T] = { if (caseClass.isValueClass) { @@ -233,7 +250,6 @@ object BeamSchemaField { new Aux[Map[K, V], ju.Map[bsfK.FromT, bsfV.FromT], ju.Map[bsfK.ToT, bsfV.ToT]] { override def fieldType(cm: CaseMapper): FieldType = FieldType.map(bsfK.fieldType(cm), bsfV.fieldType(cm)) - // TODO need to convert/not convert logical types override def from(v: ju.Map[bsfK.FromT, bsfV.FromT])(cm: CaseMapper): Map[K, V] = v.asScala.map { case (k, v) => bsfK.from(k)(cm) -> bsfV.from(v)(cm) }.toMap override def to(v: Map[K, V])(cm: CaseMapper): ju.Map[bsfK.ToT, bsfV.ToT] = diff --git a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala index 3b881be13..9a5cf127a 100644 --- a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala +++ b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala @@ -17,6 +17,7 @@ package magnolify.beam import cats.* +import com.google.protobuf.ByteString import magnolify.cats.auto.* import magnolify.cats.TestEq.* import magnolify.scalacheck.auto.* @@ -51,6 +52,12 @@ class BeamSchemaTypeSuite extends MagnolifySuite { } } + implicit val arbByteString: Arbitrary[ByteString] = + Arbitrary(Gen.alphaNumStr.map(ByteString.copyFromUtf8)) + implicit val arbBigDecimal: Arbitrary[BigDecimal] = + Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_))) + implicit val eqByteString: Eq[ByteString] = Eq.instance(_ == _) + test[Integers] test[Floats] test[Required] @@ -60,10 +67,10 @@ class BeamSchemaTypeSuite extends MagnolifySuite { test[Collections] test[MoreCollections] + test[Bs] test[Maps] test[Logical] - - // FIXME value classes + test[Decimal] { import magnolify.beam.unsafe._ @@ -72,23 +79,19 @@ class BeamSchemaTypeSuite extends MagnolifySuite { test[UnsafeEnums] } - implicit val arbBigDecimal: Arbitrary[BigDecimal] = - Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_))) - test[Decimal] - - test("Millis") { + property("Millis") { import magnolify.beam.logical.millis.* test[JavaTime] test[JodaTime] } - test("Micros") { + property("Micros") { import magnolify.beam.logical.micros.* test[JavaTime] test[JodaTime] } - test("Nanos") { + property("Nanos") { import magnolify.beam.logical.nanos.* test[JavaTime] test[JodaTime] @@ -122,10 +125,11 @@ class BeamSchemaTypeSuite extends MagnolifySuite { assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING) val record = bst(HasValueClass(ValueClass("String"))) - assert(record.getValue("vc").equals("String")) + assert(record.getValue[String]("vc").equals("String")) } } +case class Bs(bs: ByteString) case class Decimal(bd: BigDecimal, bdo: Option[BigDecimal]) case class Logical( u: UUID, @@ -148,7 +152,14 @@ case class JodaTime( case class Maps( ms: Map[String, String], mi: Map[Int, Int], + ml: Map[Long, Long], + md: Map[Double, Double], + mf: Map[Float, Float], + mb: Map[Byte, Byte], + msh: Map[Short, Short], + mba: Map[Byte, Array[Byte]], + mbs: Map[ByteString, Array[Byte]], mso: Map[Option[String], Option[String]], - ml: Map[UUID, UUID], + mu: Map[UUID, UUID], mlo: Map[Option[UUID], Option[UUID]] ) diff --git a/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala b/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala new file mode 100644 index 000000000..1e9b76a5c --- /dev/null +++ b/parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package magnolify.parquet.logical + +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit + +trait TimeTypes { + protected def unit: TimeUnit + protected def ts(adjusted: Boolean): LogicalTypeAnnotation = + LogicalTypeAnnotation.timestampType(adjusted, unit) + protected def time(adjusted: Boolean): LogicalTypeAnnotation = + LogicalTypeAnnotation.timeType(adjusted, unit) +} diff --git a/parquet/src/main/scala/magnolify/parquet/logical/package.scala b/parquet/src/main/scala/magnolify/parquet/logical/package.scala index 2883349e8..e0b64d84c 100644 --- a/parquet/src/main/scala/magnolify/parquet/logical/package.scala +++ b/parquet/src/main/scala/magnolify/parquet/logical/package.scala @@ -19,7 +19,6 @@ package magnolify.parquet import java.time._ import magnolify.parquet.ParquetField.Primitive -import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit package object logical { @@ -27,14 +26,6 @@ package object logical { // TIME (millis i32, micros i64, nanos, i64), UTC true/false // TIMESTAMP (millis, micros, nanos), UTC true/false - private trait TimeTypes { - protected def unit: TimeUnit - protected def ts(adjusted: Boolean): LogicalTypeAnnotation = - LogicalTypeAnnotation.timestampType(adjusted, unit) - protected def time(adjusted: Boolean): LogicalTypeAnnotation = - LogicalTypeAnnotation.timeType(adjusted, unit) - } - object millis extends TimeTypes { protected val unit = TimeUnit.MILLIS diff --git a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala index dbbb7bb8f..b7d6ef3ab 100644 --- a/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala +++ b/parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala @@ -165,17 +165,17 @@ class ParquetTypeSuite extends MagnolifySuite { test[Logical] - { + property("Millis") { import magnolify.parquet.logical.millis._ test[Time] } - { + property("Micros") { import magnolify.parquet.logical.micros._ test[Time] } - { + property("Nanos") { import magnolify.parquet.logical.nanos._ test[Time] } diff --git a/shared/src/main/scala/magnolify/shared/Time.scala b/shared/src/main/scala/magnolify/shared/Time.scala index 8eb6f177f..5c61e960d 100644 --- a/shared/src/main/scala/magnolify/shared/Time.scala +++ b/shared/src/main/scala/magnolify/shared/Time.scala @@ -13,7 +13,7 @@ object Time { Instant.ofEpochMilli(millisFromEpoch) @inline def millisFromInstant(instant: Instant): Long = instant.toEpochMilli @inline def millisToJodaInstant(millisFromEpoch: Long): joda.Instant = - new joda.Instant(millisFromEpoch) + joda.Instant.ofEpochMilli(millisFromEpoch) @inline def millisFromJodaInstant(instant: joda.Instant): Long = instant.getMillis @inline def millisToLocalTime(millisFromMidnight: Int): LocalTime = @@ -39,7 +39,7 @@ object Time { @inline def millisToDuration(millis: Long): Duration = Duration.ofMillis(millis) @inline def millisFromDuration(d: Duration): Long = - TimeUnit.SECONDS.toMillis(d.getSeconds) + TimeUnit.NANOSECONDS.toMillis(d.getNano) + TimeUnit.SECONDS.toMillis(d.getSeconds) + TimeUnit.NANOSECONDS.toMillis(d.getNano.toLong) @inline def millisToJodaDuration(millis: Long): joda.Duration = joda.Duration.millis(millis) @inline def millisFromJodaDuration(d: joda.Duration): Long = d.getMillis @@ -62,7 +62,7 @@ object Time { } } @inline def microsToJodaInstant(microsFromEpoch: Long): joda.Instant = - new joda.Instant(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC) + joda.Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch)) @inline def microsFromJodaInstant(instant: joda.Instant): Long = TimeUnit.MILLISECONDS.toMicros(instant.getMillis) @@ -78,7 +78,7 @@ object Time { @inline def microsToJodaLocalTime(microsFromMidnight: Long): joda.LocalTime = joda.LocalTime.fromMillisOfDay(TimeUnit.MICROSECONDS.toMillis(microsFromMidnight)) @inline def microsFromJodaLocalTime(lt: joda.LocalTime): Long = - TimeUnit.MILLISECONDS.toMicros(lt.millisOfDay().get()) + TimeUnit.MILLISECONDS.toMicros(lt.millisOfDay().get().toLong) @inline def microsToLocalDateTime(microsFromEpoch: Long): LocalDateTime = LocalDateTime.ofInstant(microsToInstant(microsFromEpoch), ZoneOffset.UTC) @@ -88,7 +88,7 @@ object Time { @inline def microsToDuration(micros: Long): Duration = Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(micros)) @inline def microsFromDuration(d: Duration): Long = - TimeUnit.SECONDS.toMicros(d.getSeconds) + TimeUnit.NANOSECONDS.toMicros(d.getNano) + TimeUnit.SECONDS.toMicros(d.getSeconds) + TimeUnit.NANOSECONDS.toMicros(d.getNano.toLong) @inline def microsToJodaDuration(micros: Long): joda.Duration = joda.Duration.millis(TimeUnit.MICROSECONDS.toMillis(micros)) @inline def microsFromJodaDuration(d: joda.Duration): Long = @@ -99,9 +99,9 @@ object Time { @inline def nanosToInstant(epochNanos: Long): Instant = Instant.ofEpochSecond(TimeUnit.NANOSECONDS.toSeconds(epochNanos), epochNanos % 1_000_000_000L) @inline def nanosFromInstant(instant: Instant): Long = - TimeUnit.MILLISECONDS.toNanos(instant.toEpochMilli) + instant.getNano + TimeUnit.SECONDS.toNanos(instant.getEpochSecond) + instant.getNano @inline def nanosToJodaInstant(nanosFromEpoch: Long): joda.Instant = - new joda.Instant(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC) + joda.Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch)) @inline def nanosFromJodaInstant(instant: joda.Instant): Long = TimeUnit.MILLISECONDS.toNanos(instant.getMillis) @@ -116,7 +116,7 @@ object Time { @inline def nanosToJodaLocalTime(nanosFromMidnight: Long): joda.LocalTime = joda.LocalTime.fromMillisOfDay(TimeUnit.NANOSECONDS.toMillis(nanosFromMidnight)) @inline def nanosFromJodaLocalTime(lt: joda.LocalTime): Long = - TimeUnit.MILLISECONDS.toNanos(lt.millisOfDay().get()) + TimeUnit.MILLISECONDS.toNanos(lt.millisOfDay().get().toLong) @inline def nanosToLocalDateTime(nanosFromEpoch: Long): LocalDateTime = LocalDateTime.ofInstant(nanosToInstant(nanosFromEpoch), ZoneOffset.UTC)