From 22ba2d19c8bdebb1fd2ad570a7c000c7694d1240 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Thu, 22 Aug 2024 16:02:40 -0400 Subject: [PATCH] wip --- .../scala/magnolify/beam/BeamSchemaType.scala | 5 +- .../magnolify/beam/logical/package.scala | 107 ++++++++++++------ docs/beam.md | 41 ++++++- docs/mapping.md | 71 ++++++------ .../main/scala/magnolify/shared/Time.scala | 8 ++ 5 files changed, 157 insertions(+), 75 deletions(-) diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala index 67b489a4..096591b7 100644 --- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala +++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala @@ -27,6 +27,7 @@ import magnolify.shims.FactoryCompat import org.apache.beam.sdk.schemas.logicaltypes import java.nio.ByteBuffer +import java.time.temporal.ChronoField import java.{time as jt, util as ju} import scala.annotation.implicitNotFound import scala.collection.concurrent @@ -215,8 +216,8 @@ object BeamSchemaField { id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date)) private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1) implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] = - from[Int](daysFromEpoch => EpochJodaDate.plusDays(daysFromEpoch))(d => - joda.Days.daysBetween(EpochJodaDate, d).getDays + from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d => + jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong) ) implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] = diff --git a/beam/src/main/scala/magnolify/beam/logical/package.scala b/beam/src/main/scala/magnolify/beam/logical/package.scala index 6475a2fd..70c75449 100644 --- a/beam/src/main/scala/magnolify/beam/logical/package.scala +++ b/beam/src/main/scala/magnolify/beam/logical/package.scala @@ -16,77 +16,110 @@ package magnolify.beam +import org.apache.beam.sdk.schemas.logicaltypes +import org.apache.beam.sdk.schemas.Schema.FieldType import org.joda.time as joda + import java.time as jt package object logical { import magnolify.shared.Time._ object millis { - // joda - // DATETIME A timestamp represented as milliseconds since the epoch + implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] = + BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME) + implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] = + BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i => + millisToInstant(millisFromJodaInstant(i)) + ) // joda.DateTime only has millisecond resolution implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] = - BeamSchemaField.from[Long](millisToJodaDateTime)(millisFromJodaDateTime) - // DATETIME A timestamp represented as milliseconds since the epoch - implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] = - BeamSchemaField.from[Long](millisToJodaInstant)(millisFromJodaInstant) - implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] = - BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration) - implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] = - BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime) - // java - implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] = - BeamSchemaField.from[Long](millisToInstant)(millisFromInstant) + BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt => + millisToInstant(millisFromJodaDateTime(dt)) + ) + implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] = BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime) + implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] = + BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime) + implicit val bsfLocalDateTimeMillis: BeamSchemaField[jt.LocalDateTime] = - BeamSchemaField.from[Long](millisToLocalDateTime)(millisFromLocalDateTime) + BeamSchemaField.id[jt.LocalDateTime](_ => FieldType.logicalType(new logicaltypes.DateTime())) + implicit val bsfJodaLocalDateTimeMillis: BeamSchemaField[joda.LocalDateTime] = + BeamSchemaField.from[jt.LocalDateTime](ldt => + millisToJodaLocalDateTime(millisFromLocalDateTime(ldt)) + )(ldt => millisToLocalDateTime(millisFromJodaLocalDateTime(ldt))) + implicit val bsfDurationMillis: BeamSchemaField[jt.Duration] = BeamSchemaField.from[Long](millisToDuration)(millisFromDuration) + implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] = + BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration) } object micros { - // joda.DateTime only has millisecond resolution, so excess precision is discarded - implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] = - BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime) + // NOTE: logicaltypes.MicrosInstant() cannot be used as it throws assertion + // errors when greater-than-microsecond precision data is used + implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] = + BeamSchemaField.from[Long](microsToInstant)(microsFromInstant) // joda.Instant has millisecond precision, excess precision discarded implicit val bsfJodaInstantMicros: BeamSchemaField[joda.Instant] = BeamSchemaField.from[Long](microsToJodaInstant)(microsFromJodaInstant) - // joda.Duration has millisecond precision, excess precision discarded - implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] = - BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration) + // joda.DateTime only has millisecond resolution, so excess precision is discarded + implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] = + BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime) + + implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] = + BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime) implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] = BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime) - // java - implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] = - BeamSchemaField.from[Long](microsToInstant)(microsFromInstant) - implicit val bsLocalTimeMicros: BeamSchemaField[jt.LocalTime] = - BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime) + implicit val bsfLocalDateTimeMicros: BeamSchemaField[jt.LocalDateTime] = BeamSchemaField.from[Long](microsToLocalDateTime)(microsFromLocalDateTime) + // joda.LocalDateTime has millisecond precision, excess precision discarded + implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] = + BeamSchemaField.from[Long](microsToJodaLocalDateTime)(microsFromJodaLocalDateTime) + implicit val bsfDurationMicros: BeamSchemaField[jt.Duration] = BeamSchemaField.from[Long](microsToDuration)(microsFromDuration) + // joda.Duration has millisecond precision, excess precision discarded + implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] = + BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration) } object nanos { + implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] = + BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant())) + implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] = + BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i => + nanosToInstant(nanosFromJodaInstant(i)) + ) // joda.DateTime only has millisecond resolution implicit val bsfJodaDateTimeNanos: BeamSchemaField[joda.DateTime] = - BeamSchemaField.from[Long](nanosToJodaDateTime)(nanosFromJodaDateTime) - implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] = - BeamSchemaField.from[Long](nanosToJodaInstant)(nanosFromJodaInstant) - implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] = - BeamSchemaField.from[Long](nanosToJodaDuration)(nanosFromJodaDuration) - implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] = - BeamSchemaField.from[Long](nanosToJodaLocalTime)(nanosFromJodaLocalTime) - // java - implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] = - BeamSchemaField.from[Long](nanosToInstant)(nanosFromInstant) + BeamSchemaField.from[jt.Instant](i => nanosToJodaDateTime(nanosFromInstant(i)))(i => + nanosToInstant(nanosFromJodaDateTime(i)) + ) + implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] = - BeamSchemaField.from[Long](nanosToLocalTime)(nanosFromLocalTime) + BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time())) + implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] = + BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt => + nanosToLocalTime(nanosFromJodaLocalTime(lt)) + ) + implicit val bsfLocalDateTimeNanos: BeamSchemaField[jt.LocalDateTime] = BeamSchemaField.from[Long](nanosToLocalDateTime)(nanosFromLocalDateTime) + // joda.LocalDateTime has millisecond precision, excess precision discarded + implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] = + BeamSchemaField.from[jt.LocalDateTime](ldt => + nanosToJodaLocalDateTime(nanosFromLocalDateTime(ldt)) + )(ldt => nanosToLocalDateTime(nanosFromJodaLocalDateTime(ldt))) + implicit val bsfDurationNanos: BeamSchemaField[jt.Duration] = - BeamSchemaField.from[Long](nanosToDuration)(nanosFromDuration) + BeamSchemaField.id[jt.Duration](_ => FieldType.logicalType(new logicaltypes.NanosDuration())) + // joda.Duration has millisecond precision, excess precision discarded + implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] = + BeamSchemaField.from[jt.Duration](d => nanosToJodaDuration(nanosFromDuration(d)))(d => + nanosToDuration(nanosFromJodaDuration(d)) + ) } } diff --git a/docs/beam.md b/docs/beam.md index 0009c2f3..00942772 100644 --- a/docs/beam.md +++ b/docs/beam.md @@ -1,3 +1,42 @@ # Beam -https://beam.apache.org/documentation/programming-guide/#schema-definition \ No newline at end of file +`BeamSchemaType[T]` provides conversion between Scala type `T` and a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit intsance of `BeamSchemaField[T]`. + +```scala mdoc:compile-only +import java.net.URI + +case class Inner(long: Long, str: String, uri: URI) +case class Outer(inner: Inner) +val record = Outer(Inner(1L, "hello", URI.create("https://www.spotify.com"))) + +import magnolify.beam.* +// Encode custom type URI as String +implicit val uriField: BeamSchemaField[URI] = BeamSchemaField.from[String](URI.create)(_.toString) + +val beamSchemaType = BeamSchemaType[Outer] +val row = beamSchemaType.to(record) +val copy: Outer = beamSchemaType.from(row) + +// Beam Schema +val schema = beamSchemaType.schema +``` + +Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`. + +To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`: + +```scala mdoc:compile-only +import magnolify.beam.* +import magnolify.shared.CaseMapper +import com.google.common.base.CaseFormat + +case class LowerCamel(firstName: String, lastName: String) + +val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _ +val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase)) +beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe) +``` + +Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case. +Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`. +Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`. \ No newline at end of file diff --git a/docs/mapping.md b/docs/mapping.md index 2b3b9222..5dfc2df8 100644 --- a/docs/mapping.md +++ b/docs/mapping.md @@ -1,38 +1,38 @@ # Type Mapping -| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | -|-----------------------------------|------------------------------|----------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| -| `Unit` | `null` | x | x | x | `Null` | x | x | x | -| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | -| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | -| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | -| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | -| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | -| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | -| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | -| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x | -| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | -| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | -| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | -| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x | -| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | -| `BigInt` | x | x | x | `BigInt` | x | x | x | x | -| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | -| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | -| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | -| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | -| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x | -| `java.time.Instant` | `long`11 | `INT64` | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | -| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | -| `java.time.LocalDate` | `int`11 | `INT64`17 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | -| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x | -| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x | -| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x | -| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`18 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | -| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x | +| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow | +|-----------------------------------|------------------------------|------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------| +| `Unit` | `null` | x | x | x | `Null` | x | x | x | +| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 | +| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 | +| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 | +| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 | +| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` | +| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` | +| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 | +| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x | +| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 | +| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` | +| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` | +| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x | +| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 | +| `BigInt` | x | x | x | `BigInt` | x | x | x | x | +| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x | +| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 | +| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 | +| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 | +| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x | +| `java.time.Instant` | `long`11 | `DATETIME` or `INT64`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x | +| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x | +| `java.time.LocalDate` | `int`11 | `INT64`18 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x | +| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x | +| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x | +| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x | +| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`19 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x | +| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x | 1. Those wrapped in`UnsafeEnum` are encoded as strings, see [enums.md](https://github.com/spotify/magnolify/blob/master/docs/enums.md) for more @@ -60,5 +60,6 @@ one of: `magnolify.parquet.ParquetField.{decimal32, decimal64, decimalFixed, decimalBinary}`. 15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`. 16. Beam logical [Enumeration type](https://beam.apache.org/documentation/programming-guide/#enumerationtype) -17. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html) -18. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html) +17. Beam logical [DateTime](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/DateTime.html), at millisecond precision. Or `INT64` when micro or nano precision is used. +18. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html) +19. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html) diff --git a/shared/src/main/scala/magnolify/shared/Time.scala b/shared/src/main/scala/magnolify/shared/Time.scala index 83ce8b8f..4eaa69c5 100644 --- a/shared/src/main/scala/magnolify/shared/Time.scala +++ b/shared/src/main/scala/magnolify/shared/Time.scala @@ -97,6 +97,10 @@ object Time { LocalDateTime.ofInstant(microsToInstant(microsFromEpoch), ZoneOffset.UTC) @inline def microsFromLocalDateTime(ldt: LocalDateTime): Long = microsFromInstant(ldt.toInstant(ZoneOffset.UTC)) + @inline def microsToJodaLocalDateTime(microsFromEpoch: Long): joda.LocalDateTime = + new joda.LocalDateTime(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC) + @inline def microsFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long = + TimeUnit.MILLISECONDS.toMicros(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis) @inline def microsToDuration(micros: Long): Duration = Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(micros)) @@ -135,6 +139,10 @@ object Time { LocalDateTime.ofInstant(nanosToInstant(nanosFromEpoch), ZoneOffset.UTC) @inline def nanosFromLocalDateTime(ldt: LocalDateTime): Long = nanosFromInstant(ldt.toInstant(ZoneOffset.UTC)) + @inline def nanosToJodaLocalDateTime(nanosFromEpoch: Long): joda.LocalDateTime = + new joda.LocalDateTime(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC) + @inline def nanosFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long = + TimeUnit.MILLISECONDS.toNanos(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis) @inline def nanosToDuration(nanos: Long): Duration = Duration.ofNanos(nanos)