Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 22, 2024
1 parent dc9c175 commit 22ba2d1
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 75 deletions.
5 changes: 3 additions & 2 deletions beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes

import java.nio.ByteBuffer
import java.time.temporal.ChronoField
import java.{time as jt, util as ju}
import scala.annotation.implicitNotFound
import scala.collection.concurrent
Expand Down Expand Up @@ -215,8 +216,8 @@ object BeamSchemaField {
id[jt.LocalDate](_ => FieldType.logicalType(new logicaltypes.Date))
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val bsfJodaLocalDate: BeamSchemaField[joda.LocalDate] =
from[Int](daysFromEpoch => EpochJodaDate.plusDays(daysFromEpoch))(d =>
joda.Days.daysBetween(EpochJodaDate, d).getDays
from[jt.LocalDate](jtld => EpochJodaDate.plusDays(jtld.get(ChronoField.EPOCH_DAY)))(d =>
jt.LocalDate.ofEpochDay(joda.Days.daysBetween(EpochJodaDate, d).getDays.toLong)
)

implicit def bsfEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): BeamSchemaField[T] =
Expand Down
107 changes: 70 additions & 37 deletions beam/src/main/scala/magnolify/beam/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,77 +16,110 @@

package magnolify.beam

import org.apache.beam.sdk.schemas.logicaltypes
import org.apache.beam.sdk.schemas.Schema.FieldType
import org.joda.time as joda

import java.time as jt

package object logical {
import magnolify.shared.Time._

object millis {
// joda
// DATETIME A timestamp represented as milliseconds since the epoch
implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.DATETIME)
implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[jt.Instant](i => millisToJodaInstant(millisFromInstant(i)))(i =>
millisToInstant(millisFromJodaInstant(i))
)
// joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeMillis: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[Long](millisToJodaDateTime)(millisFromJodaDateTime)
// DATETIME A timestamp represented as milliseconds since the epoch
implicit val bsfJodaInstantMillis: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[Long](millisToJodaInstant)(millisFromJodaInstant)
implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration)
implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)
// java
implicit val bsfInstantMillis: BeamSchemaField[jt.Instant] =
BeamSchemaField.from[Long](millisToInstant)(millisFromInstant)
BeamSchemaField.from[jt.Instant](i => millisToJodaDateTime(millisFromInstant(i)))(dt =>
millisToInstant(millisFromJodaDateTime(dt))
)

implicit val bsLocalTimeMillis: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Int](millisToLocalTime)(millisFromLocalTime)
implicit val bsfJodaLocalTimeMillis: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Int](millisToJodaLocalTime)(millisFromJodaLocalTime)

implicit val bsfLocalDateTimeMillis: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.from[Long](millisToLocalDateTime)(millisFromLocalDateTime)
BeamSchemaField.id[jt.LocalDateTime](_ => FieldType.logicalType(new logicaltypes.DateTime()))
implicit val bsfJodaLocalDateTimeMillis: BeamSchemaField[joda.LocalDateTime] =
BeamSchemaField.from[jt.LocalDateTime](ldt =>
millisToJodaLocalDateTime(millisFromLocalDateTime(ldt))
)(ldt => millisToLocalDateTime(millisFromJodaLocalDateTime(ldt)))

implicit val bsfDurationMillis: BeamSchemaField[jt.Duration] =
BeamSchemaField.from[Long](millisToDuration)(millisFromDuration)
implicit val bsfJodaDurationMillis: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[Long](millisToJodaDuration)(millisFromJodaDuration)
}

object micros {
// joda.DateTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime)
// NOTE: logicaltypes.MicrosInstant() cannot be used as it throws assertion
// errors when greater-than-microsecond precision data is used
implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] =
BeamSchemaField.from[Long](microsToInstant)(microsFromInstant)
// joda.Instant has millisecond precision, excess precision discarded
implicit val bsfJodaInstantMicros: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[Long](microsToJodaInstant)(microsFromJodaInstant)
// joda.Duration has millisecond precision, excess precision discarded
implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration)
// joda.DateTime only has millisecond resolution, so excess precision is discarded
implicit val bsfJodaDateTimeMicros: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[Long](microsToJodaDateTime)(microsFromJodaDateTime)

implicit val bsfLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)
implicit val bsfJodaLocalTimeMicros: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Long](microsToJodaLocalTime)(microsFromJodaLocalTime)
// java
implicit val bsfInstantMicros: BeamSchemaField[jt.Instant] =
BeamSchemaField.from[Long](microsToInstant)(microsFromInstant)
implicit val bsLocalTimeMicros: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Long](microsToLocalTime)(microsFromLocalTime)

implicit val bsfLocalDateTimeMicros: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.from[Long](microsToLocalDateTime)(microsFromLocalDateTime)
// joda.LocalDateTime has millisecond precision, excess precision discarded
implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] =
BeamSchemaField.from[Long](microsToJodaLocalDateTime)(microsFromJodaLocalDateTime)

implicit val bsfDurationMicros: BeamSchemaField[jt.Duration] =
BeamSchemaField.from[Long](microsToDuration)(microsFromDuration)
// joda.Duration has millisecond precision, excess precision discarded
implicit val bsfJodaDurationMicros: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[Long](microsToJodaDuration)(microsFromJodaDuration)
}

object nanos {
implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
BeamSchemaField.id[jt.Instant](_ => FieldType.logicalType(new logicaltypes.NanosInstant()))
implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[jt.Instant](i => nanosToJodaInstant(nanosFromInstant(i)))(i =>
nanosToInstant(nanosFromJodaInstant(i))
)
// joda.DateTime only has millisecond resolution
implicit val bsfJodaDateTimeNanos: BeamSchemaField[joda.DateTime] =
BeamSchemaField.from[Long](nanosToJodaDateTime)(nanosFromJodaDateTime)
implicit val bsfJodaInstantNanos: BeamSchemaField[joda.Instant] =
BeamSchemaField.from[Long](nanosToJodaInstant)(nanosFromJodaInstant)
implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[Long](nanosToJodaDuration)(nanosFromJodaDuration)
implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[Long](nanosToJodaLocalTime)(nanosFromJodaLocalTime)
// java
implicit val bsfInstantNanos: BeamSchemaField[jt.Instant] =
BeamSchemaField.from[Long](nanosToInstant)(nanosFromInstant)
BeamSchemaField.from[jt.Instant](i => nanosToJodaDateTime(nanosFromInstant(i)))(i =>
nanosToInstant(nanosFromJodaDateTime(i))
)

implicit val bsLocalTimeNanos: BeamSchemaField[jt.LocalTime] =
BeamSchemaField.from[Long](nanosToLocalTime)(nanosFromLocalTime)
BeamSchemaField.id[jt.LocalTime](_ => FieldType.logicalType(new logicaltypes.Time()))
implicit val bsfJodaLocalTimeNanos: BeamSchemaField[joda.LocalTime] =
BeamSchemaField.from[jt.LocalTime](lt => nanosToJodaLocalTime(nanosFromLocalTime(lt)))(lt =>
nanosToLocalTime(nanosFromJodaLocalTime(lt))
)

implicit val bsfLocalDateTimeNanos: BeamSchemaField[jt.LocalDateTime] =
BeamSchemaField.from[Long](nanosToLocalDateTime)(nanosFromLocalDateTime)
// joda.LocalDateTime has millisecond precision, excess precision discarded
implicit val bsfJodaLocalDateTimeMicros: BeamSchemaField[joda.LocalDateTime] =
BeamSchemaField.from[jt.LocalDateTime](ldt =>
nanosToJodaLocalDateTime(nanosFromLocalDateTime(ldt))
)(ldt => nanosToLocalDateTime(nanosFromJodaLocalDateTime(ldt)))

implicit val bsfDurationNanos: BeamSchemaField[jt.Duration] =
BeamSchemaField.from[Long](nanosToDuration)(nanosFromDuration)
BeamSchemaField.id[jt.Duration](_ => FieldType.logicalType(new logicaltypes.NanosDuration()))
// joda.Duration has millisecond precision, excess precision discarded
implicit val bsfJodaDurationNanos: BeamSchemaField[joda.Duration] =
BeamSchemaField.from[jt.Duration](d => nanosToJodaDuration(nanosFromDuration(d)))(d =>
nanosToDuration(nanosFromJodaDuration(d))
)
}
}
41 changes: 40 additions & 1 deletion docs/beam.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,42 @@
# Beam

https://beam.apache.org/documentation/programming-guide/#schema-definition
`BeamSchemaType[T]` provides conversion between Scala type `T` and a [Beam Schema](https://beam.apache.org/documentation/programming-guide/#schema-definition). Custom support for type `T` can be added with an implicit intsance of `BeamSchemaField[T]`.

```scala mdoc:compile-only
import java.net.URI

case class Inner(long: Long, str: String, uri: URI)
case class Outer(inner: Inner)
val record = Outer(Inner(1L, "hello", URI.create("https://www.spotify.com")))

import magnolify.beam.*
// Encode custom type URI as String
implicit val uriField: BeamSchemaField[URI] = BeamSchemaField.from[String](URI.create)(_.toString)

val beamSchemaType = BeamSchemaType[Outer]
val row = beamSchemaType.to(record)
val copy: Outer = beamSchemaType.from(row)

// Beam Schema
val schema = beamSchemaType.schema
```

Enum-like types map to the Beam logical [Enum type]((https://beam.apache.org/documentation/programming-guide/#enumerationtype)). See @ref:[EnumType](enums.md) for more details. `UnsafeEnum[T]` instances are available from `import magnolify.beam.unsafe._`.

To use a different field case format in target records, add an optional `CaseMapper` argument to `BeamSchemaType`:

```scala mdoc:compile-only
import magnolify.beam.*
import magnolify.shared.CaseMapper
import com.google.common.base.CaseFormat

case class LowerCamel(firstName: String, lastName: String)

val toSnakeCase = CaseFormat.LOWER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE).convert _
val beamSchemaType = BeamSchemaType[LowerCamel](CaseMapper(toSnakeCase))
beamSchemaType.to(LowerCamel("John", "Doe")) // Row(first_name: John, last_name: Doe)
```

Use `import magnolify.beam.logical.millis._`, `import magnolify.beam.logical.micros._` or `import magnolify.beam.logical.nanos._` as appropriate for your use-case.
Beam's `DATETIME` type maps to the millisecond-precision `java.time.Instant`.
Beam's `DateTime` logical type is used for millisecond-precision `java.time.LocalDateTime`, the `NanosInstant` logical type for nanosecond-precision `java.time.Instant`, the `Time` logical type for nanosecond-precision `java.time.LocalTime`, and the `NanosDuration` logical type for `java.time.Duration`.
Loading

0 comments on commit 22ba2d1

Please sign in to comment.