diff --git a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
index 096591b7..c7c2b6ed 100644
--- a/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
+++ b/beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
@@ -21,14 +21,12 @@ import magnolify.shared.*
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.schemas.Schema.FieldType
import org.apache.beam.sdk.values.Row
-import org.joda.time as joda
import com.google.protobuf.ByteString
import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes
import java.nio.ByteBuffer
-import java.time.temporal.ChronoField
-import java.{time as jt, util as ju}
+import java.util as ju
import scala.annotation.implicitNotFound
import scala.collection.concurrent
import scala.jdk.CollectionConverters.*
@@ -212,14 +210,6 @@ object BeamSchemaField {
implicit val bsfUUID: BeamSchemaField[ju.UUID] =
id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType))
- implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
- id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
- private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
- implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
- from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d =>
- jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)
- )
-
implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] =
new BeamSchemaField[T] {
type FromT = logicaltypes.EnumerationType.Value
diff --git a/beam/src/main/scala/magnolify/beam/logical/package.scala b/beam/src/main/scala/magnolify/beam/logical/package.scala
index 70c75449..f7d4c472 100644
--- a/beam/src/main/scala/magnolify/beam/logical/package.scala
+++ b/beam/src/main/scala/magnolify/beam/logical/package.scala
@@ -18,13 +18,25 @@ package magnolify.beam
import org.apache.beam.sdk.schemas.logicaltypes
import org.apache.beam.sdk.schemas.Schema.FieldType
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes
import org.joda.time as joda
import java.time as jt
+import java.time.temporal.ChronoField
package object logical {
import magnolify.shared.Time._
+ object date {
+ implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
+ BeamSchemaField.id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
+ private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
+ implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
+ BeamSchemaField.from[jt.LocalDate](jtld =>
+ EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY))
+ )(d => jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong))
+ }
+
object millis {
implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME)
@@ -32,13 +44,12 @@ package object logical {
BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i =>
millisToInstant(millisFromJodaInstant(i))
)
- // joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt =>
millisToInstant(millisFromJodaDateTime(dt))
)
- implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
+ implicit val bsfLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime)
implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)
@@ -70,6 +81,7 @@ package object logical {
implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)
+ // joda.LocalTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime)
@@ -89,6 +101,7 @@ package object logical {
object nanos {
implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant()))
+ // joda.Instant has millisecond precision, excess precision discarded
implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i =>
nanosToInstant(nanosFromJodaInstant(i))
@@ -99,8 +112,9 @@ package object logical {
nanosToInstant(nanosFromJodaDateTime(i))
)
- implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
+ implicit val bsfLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time()))
+ // joda.LocalTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt =>
nanosToLocalTime(nanosFromJodaLocalTime(lt))
@@ -122,4 +136,15 @@ package object logical {
nanosToDuration(nanosFromJodaDuration(d))
)
}
+
+ object sql {
+ implicit val bsfSqlLocalTime: BeamSchemaField[jt.LocalTime] =
+ BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIME))
+ implicit val bsfSqlInstant: BeamSchemaField[jt.Instant] =
+ BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIMESTAMP))
+ implicit val bsfSqlLocalDateTime: BeamSchemaField[jt.LocalDateTime] =
+ BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATETIME))
+ implicit val bsfSqlLocalDate: BeamSchemaField[jt.LocalDate] =
+ BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATE))
+ }
}
diff --git a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
index 9a5cf127..698e0385 100644
--- a/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
+++ b/beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
@@ -79,6 +79,12 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
test[UnsafeEnums]
}
+ property("Date") {
+ import magnolify.beam.logical.date.*
+ test[JavaDate]
+ test[JodaDate]
+ }
+
property("Millis") {
import magnolify.beam.logical.millis.*
test[JavaTime]
@@ -127,6 +133,11 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
val record = bst(HasValueClass(ValueClass("String")))
assert(record.getValue[String]("vc").equals("String"))
}
+
+ property("Sql") {
+ import magnolify.beam.logical.sql.*
+ test[Sql]
+ }
}
case class Bs(bs: ByteString)
@@ -137,9 +148,17 @@ case class Logical(
ul: List[UUID],
ulo: List[Option[UUID]]
)
+
+case class Sql(
+ i: Instant,
+ dt: LocalDateTime,
+ t: LocalTime,
+ d: LocalDate
+)
+case class JavaDate(d: LocalDate)
+case class JodaDate(jd: joda.LocalDate)
case class JavaTime(
i: Instant,
- d: LocalDate,
dt: LocalDateTime,
t: LocalTime
)
diff --git a/docs/beam.md b/docs/beam.md
index 00942772..01f78757 100644
--- a/docs/beam.md
+++ b/docs/beam.md
@@ -21,7 +21,31 @@ val copy: Outer = beamSchemaType.from(row)
val schema = beamSchemaType.schema
```
-Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`.
+## Enums
+Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe.*`.
+
+## Time and dates
+
+Java and joda `LocalDate` types are available via `import magnolify.beam.logical.date.*`
+
+For date-time, instants, and durations, use `import magnolify.beam.logical.millis.*`, `import magnolify.beam.logical.micros.*` or `import magnolify.beam.logical.nanos.*` as appropriate for your use-case.
+Note that joda types have only millisecond resolution, so excess precision will be discarded when used with `micros` or `nanos`.
+
+Where possible, Beam logical types are used and joda types defer to these implementations:
+
+* Beam's `DATETIME` primitive type maps to the millisecond-precision java and joda `Instant`s and the joda `DateTime`.
+* The `DateTime` logical type is used for millisecond-precision java and joda `LocalDateTime`
+* The `NanosInstant` logical type is used for nanosecond-precision java and joda `Instant`
+* The `Time` logical type is used for nanosecond-precision java and joda `LocalTime`
+* The `NanosDuration` logical type is used for java and joda `Duration`
+
+Beam's `MicrosInstant` should not be used as it throws exceptions when presented with greater-than-microsecond precision data.
+
+## SQL types
+
+SQL-compatible logical types are supported via `import magnolify.beam.logical.sql.*`
+
+## Case mapping
To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`:
@@ -35,8 +59,4 @@ case class LowerCamel(firstName: String, lastName: String)
val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase))
beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
-```
-
-Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case.
-Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`.
-Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`.
\ No newline at end of file
+```
\ No newline at end of file
diff --git a/docs/mapping.md b/docs/mapping.md
index 5dfc2df8..c86db47e 100644
--- a/docs/mapping.md
+++ b/docs/mapping.md
@@ -1,38 +1,38 @@
# Type Mapping
-| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow |
-|-----------------------------------|------------------------------|------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------|
-| `Unit` | `null` | x | x | x | `Null` | x | x | x |
-| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 |
-| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 |
-| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
-| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
-| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 |
-| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` |
-| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` |
-| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 |
-| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x |
-| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 |
-| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` |
-| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` |
-| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x |
-| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 |
-| `BigInt` | x | x | x | `BigInt` | x | x | x | x |
-| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x |
-| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 |
-| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 |
-| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 |
-| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x |
-| `java.time.Instant` | `long`11 | `DATETIME` or `INT64`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x |
-| `java.time.LocalDateTime` | `long`11 | `INT64` | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x |
-| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x |
-| `java.time.LocalTime` | `long`11 | `INT32` | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x |
-| `java.time.LocalDate` | `int`11 | `INT64`18 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x |
-| `org.joda.time.LocalDate` | `int`11 | `INT32` | x | x | x | x | x | x |
-| `org.joda.time.DateTime` | `int`11 | `INT64` | x | x | x | x | x | x |
-| `org.joda.time.LocalTime` | `int`11 | `INT32` | x | x | x | x | x | x |
-| `java.util.UUID` | `string`4 | `ROW(INT64, INT64)`19 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x |
-| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x |
+| Scala | Avro | Beam | BigQuery | Bigtable7 | Datastore | Parquet | Protobuf | TensorFlow |
+|-----------------------------------|------------------------------|-----------------------------------------|------------------------|---------------------------------|-----------------------|-----------------------------------|-------------------------|---------------------|
+| `Unit` | `null` | x | x | x | `Null` | x | x | x |
+| `Boolean` | `boolean` | `BOOLEAN` | `BOOL` | `Byte` | `Boolean` | `BOOLEAN` | `Boolean` | `INT64`3 |
+| `Char` | `int`3 | `BYTE` | `INT64`3 | `Char` | `Integer`3 | `INT32`3 | `Int`3 | `INT64`3 |
+| `Byte` | `int`3 | `BYTE` | `INT64`3 | `Byte` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
+| `Short` | `int`3 | `INT16` | `INT64`3 | `Short` | `Integer`3 | `INT32`9 | `Int`3 | `INT64`3 |
+| `Int` | `int` | `INT32` | `INT64`3 | `Int` | `Integer`3 | `INT32`9 | `Int` | `INT64`3 |
+| `Long` | `long` | `INT64` | `INT64` | `Long` | `Integer` | `INT64`9 | `Long` | `INT64` |
+| `Float` | `float` | `FLOAT` | `FLOAT64`3 | `Float` | `Double`3 | `FLOAT` | `Float` | `FLOAT` |
+| `Double` | `double` | `DOUBLE` | `FLOAT64` | `Double` | `Double` | `DOUBLE` | `Double` | `FLOAT`3 |
+| `CharSequence` | `string` | `STRING` | x | x | x | x | x | x |
+| `String` | `string` | `STRING` | `STRING` | `String` | `String` | `BINARY` | `String` | `BYTES`3 |
+| `Array[Byte]` | `bytes` | `BYTES` | `BYTES` | `ByteString` | `Blob` | `BINARY` | `ByteString` | `BYTES` |
+| `ByteString` | x | `BYTES` | x | `ByteString` | `Blob` | x | `ByteString` | `BYTES` |
+| `ByteBuffer` | `bytes` | `BYTES` | x | x | | x | x | x |
+| Enum1 | `enum` | `STRING`16 | `STRING`3 | `String` | `String`3 | `BINARY`/`ENUM`9 | Enum | `BYTES`3 |
+| `BigInt` | x | x | x | `BigInt` | x | x | x | x |
+| `BigDecimal` | `bytes`4 | `DECIMAL` | `NUMERIC`6 | `Int` scale + unscaled `BigInt` | x | `LOGICAL[DECIMAL]`9,14 | x | x |
+| `Option[T]` | `union[null, T]`5 | Empty as `null` | `NULLABLE` | Empty as `None` | Absent as `None` | `OPTIONAL` | `optional`10 | Size <= 1 |
+| `Iterable[T]`2 | `array[T]` | `ITERABLE` | `REPEATED` | x | `Array` | `REPEATED`13 | `repeated` | Size >= 0 |
+| Nested | `record` | `ROW` | `STRUCT` | Flat8 | `Entity` | Group | `Message` | Flat8 |
+| `Map[K, V]` | `map[V]`15 | `MAP` | x | x | x | x | `map` | x |
+| `java.time.Instant` | `long`11 | `DATETIME`, `INT64`, `ROW`17 | `TIMESTAMP` | x | `Timestamp` | `LOGICAL[TIMESTAMP]`9 | x | x |
+| `java.time.LocalDateTime` | `long`11 | `ROW`, `INT64`17 | `DATETIME` | x | x | `LOGICAL[TIMESTAMP]`9 | x | x |
+| `java.time.OffsetTime` | x | x | x | x | x | `LOGICAL[TIME]`9 | x | x |
+| `java.time.LocalTime` | `long`11 | `INT32`, `INT64`17 | `TIME` | x | x | `LOGICAL[TIME]`9 | x | x |
+| `java.time.LocalDate` | `int`11 | `INT64`17 | `DATE` | x | x | `LOGICAL[DATE]`9 | x | x |
+| `org.joda.time.LocalDate` | `int`11 | `INT64`17 | x | x | x | x | x | x |
+| `org.joda.time.DateTime` | `int`11 | `DATETIME`, `INT64`, `ROW`17 | x | x | x | x | x | x |
+| `org.joda.time.LocalTime` | `int`11 | `INT32`, `INT64`17 | x | x | x | x | x | x |
+| `java.util.UUID` | `string`4 | `ROW`18 | x | ByteString (16 bytes) | x | `FIXED[16]` | x | x |
+| `(Long, Long, Long)`12 | `fixed[12]` | x | x | x | x | x | x | x |
1. Those wrapped in`UnsafeEnum` are encoded as strings,
see [enums.md](https://github.com/spotify/magnolify/blob/master/docs/enums.md) for more
@@ -60,6 +60,5 @@
one of: `magnolify.parquet.ParquetField.{decimal32, decimal64, decimalFixed, decimalBinary}`.
15. Map key type in avro is fixed to string. Scala Map key type must be either `String` or `CharSequence`.
16. Beam logical [Enumeration type](https://beam.apache.org/documentation/programming-guide/#enumerationtype)
-17. Beam logical [DateTime](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/DateTime.html), at millisecond precision. Or `INT64` when micro or nano precision is used.
-18. Beam logical [Date type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/Date.html)
-19. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html)
+17. See [beam.md][protobuf.md](https://github.com/spotify/magnolify/blob/master/docs/beam.md) for details
+18. Beam logical [UUID type](https://beam.apache.org/releases/javadoc/2.58.1/org/apache/beam/sdk/schemas/logicaltypes/UuidLogicalType.html)