diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
index 67b489a4..096591b7 100644
--- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
+++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
@@ -27,6 +27,7 @@ import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes
import java.nio.ByteBuffer
+import java.time.temporal.ChronoField
import java.{time as jt, util as ju}
import scala.annotation.implicitNotFound
import scala.collection.concurrent
@@ -215,8 +216,8 @@ object BeamSchemaField {
id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
- from[Int](daysFromEpoch => EpochJodaDate.plusDays(daysFromEpoch))(d =>
- joda.Days.daysBetween(EpochJodaDate, d).getDays
+ from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d =>
+ jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)
)
implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] =
diff --git a/beam/src/main/scala/magnolify/beam/logical/package.scala b/beam/src/main/scala/magnolify/beam/logical/package.scala
index 6475a2fd..70c75449 100644
--- a/beam/src/main/scala/magnolify/beam/logical/package.scala
+++ b/beam/src/main/scala/magnolify/beam/logical/package.scala
@@ -16,77 +16,110 @@
package magnolify.beam
+import org.apache.beam.sdk.schemas.logicaltypes
+import org.apache.beam.sdk.schemas.Schema.FieldType
import org.joda.time as joda
+
import java.time as jt
package object logical {
import magnolify.shared.Time._
object millis {
- // joda
- // DATETIME A timestamp represented as milliseconds since the epoch
+ implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
+ BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME)
+ implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] =
+ BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i =>
+ millisToInstant(millisFromJodaInstant(i))
+ )
// joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] =
- BeamSchemaField.from[Long](millisToJodaDateTime)(millisFromJodaDateTime)
- // DATETIME A timestamp represented as milliseconds since the epoch
- implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] =
- BeamSchemaField.from[Long](millisToJodaInstant)(millisFromJodaInstant)
- implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] =
- BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration)
- implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
- BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)
- // java
- implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
- BeamSchemaField.from[Long](millisToInstant)(millisFromInstant)
+ BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt =>
+ millisToInstant(millisFromJodaDateTime(dt))
+ )
+
implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime)
+ implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
+ BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)
+
implicit val bsfLocalDateTimeMillis: BeamSchemaField[jt.LocalDateTime] =
- BeamSchemaField.from[Long](millisToLocalDateTime)(millisFromLocalDateTime)
+ BeamSchemaField.id[jt.LocalDateTime](_ => FieldType.logicalType(new logicaltypes.DateTime()))
+ implicit val bsfJodaLocalDateTimeMillis: BeamSchemaField[joda.LocalDateTime] =
+ BeamSchemaField.from[jt.LocalDateTime](ldt =>
+ millisToJodaLocalDateTime(millisFromLocalDateTime(ldt))
+ )(ldt => millisToLocalDateTime(millisFromJodaLocalDateTime(ldt)))
+
implicit val bsfDurationMillis: BeamSchemaField[jt.Duration] =
BeamSchemaField.from[Long](millisToDuration)(millisFromDuration)
+ implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] =
+ BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration)
}
object micros {
- // joda.DateTime only has millisecond resolution, so excess precision is discarded
- implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] =
- BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime)
+ // NOTE: logicaltypes.MicrosInstant() cannot be used as it throws assertion
+ // errors when greater-than-microsecond precision data is used
+ implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] =
+ BeamSchemaField.from[Long](microsToInstant)(microsFromInstant)
// joda.Instant has millisecond precision, excess precision discarded
implicit val bsfJodaInstantMicros: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[Long](microsToJodaInstant)(microsFromJodaInstant)
- // joda.Duration has millisecond precision, excess precision discarded
- implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] =
- BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration)
+ // joda.DateTime only has millisecond resolution, so excess precision is discarded
+ implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] =
+ BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime)
+
+ implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
+ BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)
implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime)
- // java
- implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] =
- BeamSchemaField.from[Long](microsToInstant)(microsFromInstant)
- implicit val bsLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
- BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)
+
implicit val bsfLocalDateTimeMicros: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.from[Long](microsToLocalDateTime)(microsFromLocalDateTime)
+ // joda.LocalDateTime has millisecond precision, excess precision discarded
+ implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] =
+ BeamSchemaField.from[Long](microsToJodaLocalDateTime)(microsFromJodaLocalDateTime)
+
implicit val bsfDurationMicros: BeamSchemaField[jt.Duration] =
BeamSchemaField.from[Long](microsToDuration)(microsFromDuration)
+ // joda.Duration has millisecond precision, excess precision discarded
+ implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] =
+ BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration)
}
object nanos {
+ implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
+ BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant()))
+ implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
+ BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i =>
+ nanosToInstant(nanosFromJodaInstant(i))
+ )
// joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeNanos: BeamSchemaField[joda.DateTime] =
- BeamSchemaField.from[Long](nanosToJodaDateTime)(nanosFromJodaDateTime)
- implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
- BeamSchemaField.from[Long](nanosToJodaInstant)(nanosFromJodaInstant)
- implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] =
- BeamSchemaField.from[Long](nanosToJodaDuration)(nanosFromJodaDuration)
- implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
- BeamSchemaField.from[Long](nanosToJodaLocalTime)(nanosFromJodaLocalTime)
- // java
- implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
- BeamSchemaField.from[Long](nanosToInstant)(nanosFromInstant)
+ BeamSchemaField.from[jt.Instant](i => nanosToJodaDateTime(nanosFromInstant(i)))(i =>
+ nanosToInstant(nanosFromJodaDateTime(i))
+ )
+
implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
- BeamSchemaField.from[Long](nanosToLocalTime)(nanosFromLocalTime)
+ BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time()))
+ implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
+ BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt =>
+ nanosToLocalTime(nanosFromJodaLocalTime(lt))
+ )
+
implicit val bsfLocalDateTimeNanos: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.from[Long](nanosToLocalDateTime)(nanosFromLocalDateTime)
+ // joda.LocalDateTime has millisecond precision, excess precision discarded
+ implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] =
+ BeamSchemaField.from[jt.LocalDateTime](ldt =>
+ nanosToJodaLocalDateTime(nanosFromLocalDateTime(ldt))
+ )(ldt => nanosToLocalDateTime(nanosFromJodaLocalDateTime(ldt)))
+
implicit val bsfDurationNanos: BeamSchemaField[jt.Duration] =
- BeamSchemaField.from[Long](nanosToDuration)(nanosFromDuration)
+ BeamSchemaField.id[jt.Duration](_ => FieldType.logicalType(new logicaltypes.NanosDuration()))
+ // joda.Duration has millisecond precision, excess precision discarded
+ implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] =
+ BeamSchemaField.from[jt.Duration](d => nanosToJodaDuration(nanosFromDuration(d)))(d =>
+ nanosToDuration(nanosFromJodaDuration(d))
+ )
}
}
diff --git a/docs/beam.md b/docs/beam.md
index 0009c2f3..00942772 100644
--- a/docs/beam.md
+++ b/docs/beam.md
@@ -1,3 +1,42 @@
# Beam
-https://beam.apache.org/documentation/programming-guide/#schema-definition
\ No newline at end of file
+`BeamSchemaType[T]` provides conversion between Scala type `T` and a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit intsance of `BeamSchemaField[T]`.
+
+```scala mdoc:compile-only
+import java.net.URI
+
+case class Inner(long: Long, str: String, uri: URI)
+case class Outer(inner: Inner)
+val record = Outer(Inner(1L, "hello", URI.create("https://www.spotify.com")))
+
+import magnolify.beam.*
+// Encode custom type URI as String
+implicit val uriField: BeamSchemaField[URI] = BeamSchemaField.from[String](URI.create)(_.toString)
+
+val beamSchemaType = BeamSchemaType[Outer]
+val row = beamSchemaType.to(record)
+val copy: Outer = beamSchemaType.from(row)
+
+// Beam Schema
+val schema = beamSchemaType.schema
+```
+
+Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`.
+
+To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`:
+
+```scala mdoc:compile-only
+import magnolify.beam.*
+import magnolify.shared.CaseMapper
+import com.google.common.base.CaseFormat
+
+case class LowerCamel(firstName: String, lastName: String)
+
+val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
+val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase))
+beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
+```
+
+Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case.
+Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`.
+Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`.
\ No newline at end of file
diff --git a/docs/mapping.md b/docs/mapping.md
index 2b3b9222..5dfc2df8 100644
--- a/docs/mapping.md
+++ b/docs/mapping.md
@@ -1,38 +1,38 @@
# Type Mapping
-| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow |
-|-----------------------------------|------------------------------|----------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------|
-| `Unit` | `null` | x | x | x | `Null` | x | x | x |
-| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 |
-| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 |
-| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
-| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
-| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 |
-| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` |
-| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` |
-| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 |
-| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x |
-| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 |
-| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` |
-| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` |
-| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x |
-| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 |
-| `BigInt` | x | x | x | `BigInt` | x | x | x | x |
-| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x |
-| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 |
-| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 |
-| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 |
-| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x |
-| `java.time.Instant` | `long`11 | `INT64` | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x |
-| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x |
-| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x |
-| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x |
-| `java.time.LocalDate` | `int`11 | `INT64`17 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x |
-| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x |
-| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x |
-| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x |
-| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`18 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x |
-| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x |
+| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow |
+|-----------------------------------|------------------------------|------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------|
+| `Unit` | `null` | x | x | x | `Null` | x | x | x |
+| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 |
+| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 |
+| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
+| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
+| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 |
+| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` |
+| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` |
+| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 |
+| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x |
+| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 |
+| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` |
+| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` |
+| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x |
+| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 |
+| `BigInt` | x | x | x | `BigInt` | x | x | x | x |
+| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x |
+| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 |
+| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 |
+| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 |
+| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x |
+| `java.time.Instant` | `long`11 | `DATETIME` or `INT64`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x |
+| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x |
+| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x |
+| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x |
+| `java.time.LocalDate` | `int`11 | `INT64`18 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x |
+| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x |
+| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x |
+| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x |
+| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`19 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x |
+| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x |
1. Those wrapped in`UnsafeEnum` are encoded as strings,
see [enums.md](https://github.com/spotify/magnolify/blob/master/docs/enums.md) for more
@@ -60,5 +60,6 @@
one of: `magnolify.parquet.ParquetField.{decimal32, decimal64, decimalFixed, decimalBinary}`.
15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`.
16. Beam logical [Enumeration type](https://beam.apache.org/documentation/programming-guide/#enumerationtype)
-17. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html)
-18. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html)
+17. Beam logical [DateTime](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/DateTime.html), at millisecond precision. Or `INT64` when micro or nano precision is used.
+18. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html)
+19. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html)
diff --git a/shared/src/main/scala/magnolify/shared/Time.scala b/shared/src/main/scala/magnolify/shared/Time.scala
index 83ce8b8f..4eaa69c5 100644
--- a/shared/src/main/scala/magnolify/shared/Time.scala
+++ b/shared/src/main/scala/magnolify/shared/Time.scala
@@ -97,6 +97,10 @@ object Time {
LocalDateTime.ofInstant(microsToInstant(microsFromEpoch), ZoneOffset.UTC)
@inline def microsFromLocalDateTime(ldt: LocalDateTime): Long =
microsFromInstant(ldt.toInstant(ZoneOffset.UTC))
+ @inline def microsToJodaLocalDateTime(microsFromEpoch: Long): joda.LocalDateTime =
+ new joda.LocalDateTime(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC)
+ @inline def microsFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long =
+ TimeUnit.MILLISECONDS.toMicros(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis)
@inline def microsToDuration(micros: Long): Duration =
Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(micros))
@@ -135,6 +139,10 @@ object Time {
LocalDateTime.ofInstant(nanosToInstant(nanosFromEpoch), ZoneOffset.UTC)
@inline def nanosFromLocalDateTime(ldt: LocalDateTime): Long =
nanosFromInstant(ldt.toInstant(ZoneOffset.UTC))
+ @inline def nanosToJodaLocalDateTime(nanosFromEpoch: Long): joda.LocalDateTime =
+ new joda.LocalDateTime(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC)
+ @inline def nanosFromJodaLocalDateTime(ldt: joda.LocalDateTime): Long =
+ TimeUnit.MILLISECONDS.toNanos(ldt.toDateTime(joda.DateTimeZone.UTC).getMillis)
@inline def nanosToDuration(nanos: Long): Duration =
Duration.ofNanos(nanos)