From caa1f2b68243aaff2745bf1d1d6233c3d418f304 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Thu, 22 Aug 2024 16:38:54 -0400 Subject: [PATCH] wip --- .../scala/magnolify/beam/BeamSchemaType.scala | 12 +--- .../magnolify/beam/logical/package.scala | 31 +++++++- .../magnolify/beam/BeamSchemaTypeSuite.scala | 21 +++++- docs/beam.md | 32 +++++++-- docs/mapping.md | 71 +++++++++---------- 5 files changed, 110 insertions(+), 57 deletions(-) diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala index 096591b7..c7c2b6ed 100644 --- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala +++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala @@ -21,14 +21,12 @@ import magnolify.shared.* import org.apache.beam.sdk.schemas.Schema import org.apache.beam.sdk.schemas.Schema.FieldType import org.apache.beam.sdk.values.Row -import org.joda.time as joda import com.google.protobuf.ByteString import magnolify.shims.FactoryCompat import org.apache.beam.sdk.schemas.logicaltypes import java.nio.ByteBuffer -import java.time.temporal.ChronoField -import java.{time as jt, util as ju} +import java.util as ju import scala.annotation.implicitNotFound import scala.collection.concurrent import scala.jdk.CollectionConverters.* @@ -212,14 +210,6 @@ object BeamSchemaField { implicit val bsfUUID: BeamSchemaField[ju.UUID] = id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType)) - implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] = - id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) - private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) - implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] = - from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d => - jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong) - ) - implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] = new BeamSchemaField[T] { type FromT = logicaltypes.EnumerationType.Value diff --git a/beam/src/main/scala/magnolify/beam/logical/package.scala b/beam/src/main/scala/magnolify/beam/logical/package.scala index 70c75449..f7d4c472 100644 --- a/beam/src/main/scala/magnolify/beam/logical/package.scala +++ b/beam/src/main/scala/magnolify/beam/logical/package.scala @@ -18,13 +18,25 @@ package magnolify.beam import org.apache.beam.sdk.schemas.logicaltypes import org.apache.beam.sdk.schemas.Schema.FieldType +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes import org.joda.time as joda import java.time as jt +import java.time.temporal.ChronoField package object logical { import magnolify.shared.Time._ + object date { + implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] = + BeamSchemaField.id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) + private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) + implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] = + BeamSchemaField.from[jt.LocalDate](jtld => + EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)) + )(d => jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)) + } + object millis { implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] = BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME) @@ -32,13 +44,12 @@ package object logical { BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i => millisToInstant(millisFromJodaInstant(i)) ) - // joda.DateTime only has millisecond resolution implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] = BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt => millisToInstant(millisFromJodaDateTime(dt)) ) - implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] = + implicit val bsfLocalTimeMillis: BeamSchemaField[jt.LocalTime] = BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime) implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] = BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime) @@ -70,6 +81,7 @@ package object logical { implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] = BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime) + // joda.LocalTime only has millisecond resolution, so excess precision is discarded implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] = BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime) @@ -89,6 +101,7 @@ package object logical { object nanos { implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] = BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant())) + // joda.Instant has millisecond precision, excess precision discarded implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] = BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i => nanosToInstant(nanosFromJodaInstant(i)) @@ -99,8 +112,9 @@ package object logical { nanosToInstant(nanosFromJodaDateTime(i)) ) - implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] = + implicit val bsfLocalTimeNanos: BeamSchemaField[jt.LocalTime] = BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time())) + // joda.LocalTime only has millisecond resolution, so excess precision is discarded implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] = BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt => nanosToLocalTime(nanosFromJodaLocalTime(lt)) @@ -122,4 +136,15 @@ package object logical { nanosToDuration(nanosFromJodaDuration(d)) ) } + + object sql { + implicit val bsfSqlLocalTime: BeamSchemaField[jt.LocalTime] = + BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIME)) + implicit val bsfSqlInstant: BeamSchemaField[jt.Instant] = + BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIMESTAMP)) + implicit val bsfSqlLocalDateTime: BeamSchemaField[jt.LocalDateTime] = + BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATETIME)) + implicit val bsfSqlLocalDate: BeamSchemaField[jt.LocalDate] = + BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATE)) + } } diff --git a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala index 9a5cf127..698e0385 100644 --- a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala +++ b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala @@ -79,6 +79,12 @@ class BeamSchemaTypeSuite extends MagnolifySuite { test[UnsafeEnums] } + property("Date") { + import magnolify.beam.logical.date.* + test[JavaDate] + test[JodaDate] + } + property("Millis") { import magnolify.beam.logical.millis.* test[JavaTime] @@ -127,6 +133,11 @@ class BeamSchemaTypeSuite extends MagnolifySuite { val record = bst(HasValueClass(ValueClass("String"))) assert(record.getValue[String]("vc").equals("String")) } + + property("Sql") { + import magnolify.beam.logical.sql.* + test[Sql] + } } case class Bs(bs: ByteString) @@ -137,9 +148,17 @@ case class Logical( ul: List[UUID], ulo: List[Option[UUID]] ) + +case class Sql( + i: Instant, + dt: LocalDateTime, + t: LocalTime, + d: LocalDate +) +case class JavaDate(d: LocalDate) +case class JodaDate(jd: joda.LocalDate) case class JavaTime( i: Instant, - d: LocalDate, dt: LocalDateTime, t: LocalTime ) diff --git a/docs/beam.md b/docs/beam.md index 00942772..01f78757 100644 --- a/docs/beam.md +++ b/docs/beam.md @@ -21,7 +21,31 @@ val copy: Outer = beamSchemaType.from(row) val schema = beamSchemaType.schema ``` -Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`. +## Enums +Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe.*`. + +## Time and dates + +Java and joda `LocalDate` types are available via `import magnolify.beam.logical.date.*` + +For date-time, instants, and durations, use `import magnolify.beam.logical.millis.*`, `import magnolify.beam.logical.micros.*` or `import magnolify.beam.logical.nanos.*` as appropriate for your use-case. +Note that joda types have only millisecond resolution, so excess precision will be discarded when used with `micros` or `nanos`. + +Where possible, Beam logical types are used and joda types defer to these implementations: + +* Beam's `DATETIME` primitive type maps to the millisecond-precision java and joda `Instant`s and the joda `DateTime`. +* The `DateTime` logical type is used for millisecond-precision java and joda `LocalDateTime` +* The `NanosInstant` logical type is used for nanosecond-precision java and joda `Instant` +* The `Time` logical type is used for nanosecond-precision java and joda `LocalTime` +* The `NanosDuration` logical type is used for java and joda `Duration` + +Beam's `MicrosInstant` should not be used as it throws exceptions when presented with greater-than-microsecond precision data. + +## SQL types + +SQL-compatible logical types are supported via `import magnolify.beam.logical.sql.*` + +## Case mapping To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`: @@ -35,8 +59,4 @@ case class LowerCamel(firstName: String, lastName: String) val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _ val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase)) beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe) -``` - -Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case. -Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`. -Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`. \ No newline at end of file +``` \ No newline at end of file diff --git a/docs/mapping.md b/docs/mapping.md index 5dfc2df8..c86db47e 100644 --- a/docs/mapping.md +++ b/docs/mapping.md @@ -1,38 +1,38 @@ # Type Mapping -| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | -|-----------------------------------|------------------------------|------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| -| `Unit` | `null` | x | x | x | `Null` | x | x | x | -| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | -| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | -| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | -| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | -| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | -| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | -| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x | -| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | -| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | -| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | -| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x | -| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | -| `BigInt` | x | x | x | `BigInt` | x | x | x | x | -| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | -| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | -| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | -| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | -| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x | -| `java.time.Instant` | `long`11 | `DATETIME` or `INT64`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalDate` | `int`11 | `INT64`18 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | -| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x | -| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x | -| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x | -| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`19 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | -| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x | +| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | +|-----------------------------------|------------------------------|-----------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| +| `Unit` | `null` | x | x | x | `Null` | x | x | x | +| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | +| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | +| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | +| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | +| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | +| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | +| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x | +| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | +| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | +| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | +| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x | +| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | +| `BigInt` | x | x | x | `BigInt` | x | x | x | x | +| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | +| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | +| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | +| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | +| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x | +| `java.time.Instant` | `long`11 | `DATETIME`, `INT64`, `ROW`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.LocalDateTime` | `long`11 | `ROW`, `INT64`17 | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalTime` | `long`11 | `INT32`, `INT64`17 | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalDate` | `int`11 | `INT64`17 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | +| `org.joda.time.LocalDate` | `int`11 | `INT64`17 | x | x | x | x | x | x | +| `org.joda.time.DateTime` | `int`11 | `DATETIME`, `INT64`, `ROW`17 | x | x | x | x | x | x | +| `org.joda.time.LocalTime` | `int`11 | `INT32`, `INT64`17 | x | x | x | x | x | x | +| `java.util.UUID` | `string`4 | `ROW`18 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | +| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x | 1. Those wrapped in`UnsafeEnum` are encoded as strings, see [enums.md](https://github.com/spotify/magnolify/blob/master/docs/enums.md) for more @@ -60,6 +60,5 @@ one of: `magnolify.parquet.ParquetField.{decimal32, decimal64, decimalFixed, decimalBinary}`. 15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`. 16. Beam logical [Enumeration type](https://beam.apache.org/documentation/programming-guide/#enumerationtype) -17. Beam logical [DateTime](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/DateTime.html), at millisecond precision. Or `INT64` when micro or nano precision is used. -18. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html) -19. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html) +17. See [beam.md][protobuf.md](https://github.com/spotify/magnolify/blob/master/docs/beam.md) for details +18. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html)