Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 22, 2024
1 parent 22ba2d1 commit caa1f2b
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 57 deletions.
12 changes: 1 addition & 11 deletions beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import magnolify.shared.*
import org.apache.beam.sdk.schemas.Schema
import org.apache.beam.sdk.schemas.Schema.FieldType
import org.apache.beam.sdk.values.Row
import org.joda.time as joda
import com.google.protobuf.ByteString
import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes

import java.nio.ByteBuffer
import java.time.temporal.ChronoField
import java.{time as jt, util as ju}
import java.util as ju
import scala.annotation.implicitNotFound
import scala.collection.concurrent
import scala.jdk.CollectionConverters.*
Expand Down Expand Up @@ -212,14 +210,6 @@ object BeamSchemaField {
implicit val bsfUUID: BeamSchemaField[ju.UUID] =
id[ju.UUID](_ => FieldType.logicalType(new logicaltypes.UuidLogicalType))

implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d =>
jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)
)

implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] =
new BeamSchemaField[T] {
type FromT = logicaltypes.EnumerationType.Value
Expand Down
31 changes: 28 additions & 3 deletions beam/src/main/scala/magnolify/beam/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,38 @@ package magnolify.beam

import org.apache.beam.sdk.schemas.logicaltypes
import org.apache.beam.sdk.schemas.Schema.FieldType
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes
import org.joda.time as joda

import java.time as jt
import java.time.temporal.ChronoField

package object logical {
import magnolify.shared.Time._

object date {
implicit val bsfLocalDate: BeamSchemaField[jt.LocalDate] =
BeamSchemaField.id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
BeamSchemaField.from[jt.LocalDate](jtld =>
EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY))

Check warning on line 36 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L36

Added line #L36 was not covered by tests
)(d => jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong))
}

object millis {
implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME)
implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i =>
millisToInstant(millisFromJodaInstant(i))

Check warning on line 45 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L45

Added line #L45 was not covered by tests
)
// joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt =>
millisToInstant(millisFromJodaDateTime(dt))

Check warning on line 49 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L49

Added line #L49 was not covered by tests
)

implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
implicit val bsfLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime)
implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)
Expand Down Expand Up @@ -70,6 +81,7 @@ package object logical {

implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)
// joda.LocalTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime)

Expand All @@ -89,6 +101,7 @@ package object logical {
object nanos {
implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant()))
// joda.Instant has millisecond precision, excess precision discarded
implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i =>
nanosToInstant(nanosFromJodaInstant(i))

Check warning on line 107 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L107

Added line #L107 was not covered by tests
Expand All @@ -99,8 +112,9 @@ package object logical {
nanosToInstant(nanosFromJodaDateTime(i))

Check warning on line 112 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L112

Added line #L112 was not covered by tests
)

implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
implicit val bsfLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time()))
// joda.LocalTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt =>
nanosToLocalTime(nanosFromJodaLocalTime(lt))

Check warning on line 120 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L120

Added line #L120 was not covered by tests
Expand All @@ -122,4 +136,15 @@ package object logical {
nanosToDuration(nanosFromJodaDuration(d))

Check warning on line 136 in beam/src/main/scala/magnolify/beam/logical/package.scala

View check run for this annotation

Codecov / codecov/patch

beam/src/main/scala/magnolify/beam/logical/package.scala#L136

Added line #L136 was not covered by tests
)
}

object sql {
implicit val bsfSqlLocalTime: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIME))
implicit val bsfSqlInstant: BeamSchemaField[jt.Instant] =
BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.TIMESTAMP))
implicit val bsfSqlLocalDateTime: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATETIME))
implicit val bsfSqlLocalDate: BeamSchemaField[jt.LocalDate] =
BeamSchemaField.id(_ => FieldType.logicalType(SqlTypes.DATE))
}
}
21 changes: 20 additions & 1 deletion beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
test[UnsafeEnums]
}

property("Date") {
import magnolify.beam.logical.date.*
test[JavaDate]
test[JodaDate]
}

property("Millis") {
import magnolify.beam.logical.millis.*
test[JavaTime]
Expand Down Expand Up @@ -127,6 +133,11 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
val record = bst(HasValueClass(ValueClass("String")))
assert(record.getValue[String]("vc").equals("String"))
}

property("Sql") {
import magnolify.beam.logical.sql.*
test[Sql]
}
}

case class Bs(bs: ByteString)
Expand All @@ -137,9 +148,17 @@ case class Logical(
ul: List[UUID],
ulo: List[Option[UUID]]
)

case class Sql(
i: Instant,
dt: LocalDateTime,
t: LocalTime,
d: LocalDate
)
case class JavaDate(d: LocalDate)
case class JodaDate(jd: joda.LocalDate)
case class JavaTime(
i: Instant,
d: LocalDate,
dt: LocalDateTime,
t: LocalTime
)
Expand Down
32 changes: 26 additions & 6 deletions docs/beam.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,31 @@ val copy: Outer = beamSchemaType.from(row)
val schema = beamSchemaType.schema
```

Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`.
## Enums
Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe.*`.

## Time and dates

Java and joda `LocalDate` types are available via `import magnolify.beam.logical.date.*`

For date-time, instants, and durations, use `import magnolify.beam.logical.millis.*`, `import magnolify.beam.logical.micros.*` or `import magnolify.beam.logical.nanos.*` as appropriate for your use-case.
Note that joda types have only millisecond resolution, so excess precision will be discarded when used with `micros` or `nanos`.

Where possible, Beam logical types are used and joda types defer to these implementations:

* Beam's `DATETIME` primitive type maps to the millisecond-precision java and joda `Instant`s and the joda `DateTime`.
* The `DateTime` logical type is used for millisecond-precision java and joda `LocalDateTime`
* The `NanosInstant` logical type is used for nanosecond-precision java and joda `Instant`
* The `Time` logical type is used for nanosecond-precision java and joda `LocalTime`
* The `NanosDuration` logical type is used for java and joda `Duration`

Beam's `MicrosInstant` should not be used as it throws exceptions when presented with greater-than-microsecond precision data.

## SQL types

SQL-compatible logical types are supported via `import magnolify.beam.logical.sql.*`

## Case mapping

To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`:

Expand All @@ -35,8 +59,4 @@ case class LowerCamel(firstName: String, lastName: String)
val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase))
beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
```

Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case.
Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`.
Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`.
```
Loading

0 comments on commit caa1f2b

Please sign in to comment.