Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 22, 2024
1 parent f0dabc6 commit a3d90bf
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 36 deletions.
2 changes: 1 addition & 1 deletion avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ object AvroField {
override def from(v: String)(cm: CaseMapper): String = v
override def to(v: String)(cm: CaseMapper): String = v
}
f

implicit def afEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): AvroField[T] =
// Avro 1.9+ added a type parameter for `GenericEnumSymbol`, breaking 1.8 compatibility
// Some reader, i.e. `AvroParquetReader` reads enums as `Utf8`
Expand Down
1 change: 0 additions & 1 deletion avro/src/main/scala/magnolify/avro/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.joda.{time => joda}

import java.time._
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.util.concurrent.TimeUnit

package object logical {
import magnolify.shared.Time._
Expand Down
22 changes: 19 additions & 3 deletions beam/src/main/scala/magnolify/beam/BeamSchemaType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.Row
import org.joda.time as joda
import com.google.protobuf.ByteString
import magnolify.shims.FactoryCompat
import org.apache.beam.sdk.schemas.logicaltypes as logicaltypes
import org.apache.beam.sdk.schemas.logicaltypes

import java.nio.ByteBuffer
import java.{time as jt, util as ju}
Expand Down Expand Up @@ -112,9 +112,26 @@ object BeamSchemaField {
type Typeclass[T] = BeamSchemaField[T]
implicit def gen[T]: BeamSchemaField[T] = macro Magnolia.gen[T]

@implicitNotFound("Cannot derive BeamSchemaField for sealed trait") // TODO does this make sense?
// TODO beam schemas support OneOf
@implicitNotFound("Cannot derive BeamSchemaField for sealed trait")
private sealed trait Dispatchable[T]
def split[T: Dispatchable](sealedTrait: SealedTrait[Typeclass, T]): BeamSchemaField[T] = ???
// new BeamSchemaField[T] {
// override type FromT = ???
// override type ToT = ???
// override def fieldType(cm: CaseMapper): FieldType = {
// FieldType.logicalType(
// logicaltypes.OneOfType.create(
// sealedTrait.subtypes.map { sub =>
// Field.of(s"${sub.typeName.owner}.${sub.typeName.short}", sub.typeclass.fieldType(cm))
// }
// .asJava
// )
// )
// }
// override def from(v: this.type)(cm: CaseMapper): T = ???
// override def to(v: T)(cm: CaseMapper): this.type = ???
// }

def join[T](caseClass: CaseClass[Typeclass, T]): BeamSchemaField[T] = {
if (caseClass.isValueClass) {
Expand Down Expand Up @@ -233,7 +250,6 @@ object BeamSchemaField {
new Aux[Map[K, V], ju.Map[bsfK.FromT, bsfV.FromT], ju.Map[bsfK.ToT, bsfV.ToT]] {
override def fieldType(cm: CaseMapper): FieldType =
FieldType.map(bsfK.fieldType(cm), bsfV.fieldType(cm))
// TODO need to convert/not convert logical types
override def from(v: ju.Map[bsfK.FromT, bsfV.FromT])(cm: CaseMapper): Map[K, V] =
v.asScala.map { case (k, v) => bsfK.from(k)(cm) -> bsfV.from(v)(cm) }.toMap
override def to(v: Map[K, V])(cm: CaseMapper): ju.Map[bsfK.ToT, bsfV.ToT] =
Expand Down
33 changes: 22 additions & 11 deletions beam/src/test/scala/magnolify/beam/BeamSchemaTypeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package magnolify.beam

import cats.*
import com.google.protobuf.ByteString
import magnolify.cats.auto.*
import magnolify.cats.TestEq.*
import magnolify.scalacheck.auto.*
Expand Down Expand Up @@ -51,6 +52,12 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
}
}

implicit val arbByteString: Arbitrary[ByteString] =
Arbitrary(Gen.alphaNumStr.map(ByteString.copyFromUtf8))
implicit val arbBigDecimal: Arbitrary[BigDecimal] =
Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_)))
implicit val eqByteString: Eq[ByteString] = Eq.instance(_ == _)

test[Integers]
test[Floats]
test[Required]
Expand All @@ -60,10 +67,10 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
test[Collections]
test[MoreCollections]

test[Bs]
test[Maps]
test[Logical]

// FIXME value classes
test[Decimal]

{
import magnolify.beam.unsafe._
Expand All @@ -72,23 +79,19 @@ class BeamSchemaTypeSuite extends MagnolifySuite {
test[UnsafeEnums]
}

implicit val arbBigDecimal: Arbitrary[BigDecimal] =
Arbitrary(Gen.chooseNum(0, Int.MaxValue).map(BigDecimal(_)))
test[Decimal]

test("Millis") {
property("Millis") {
import magnolify.beam.logical.millis.*
test[JavaTime]
test[JodaTime]
}

test("Micros") {
property("Micros") {
import magnolify.beam.logical.micros.*
test[JavaTime]
test[JodaTime]
}

test("Nanos") {
property("Nanos") {
import magnolify.beam.logical.nanos.*
test[JavaTime]
test[JodaTime]
Expand Down Expand Up @@ -122,10 +125,11 @@ class BeamSchemaTypeSuite extends MagnolifySuite {

assert(bst.schema.getField("vc").getType == Schema.FieldType.STRING)
val record = bst(HasValueClass(ValueClass("String")))
assert(record.getValue("vc").equals("String"))
assert(record.getValue[String]("vc").equals("String"))
}
}

case class Bs(bs: ByteString)
case class Decimal(bd: BigDecimal, bdo: Option[BigDecimal])
case class Logical(
u: UUID,
Expand All @@ -148,7 +152,14 @@ case class JodaTime(
case class Maps(
ms: Map[String, String],
mi: Map[Int, Int],
ml: Map[Long, Long],
md: Map[Double, Double],
mf: Map[Float, Float],
mb: Map[Byte, Byte],
msh: Map[Short, Short],
mba: Map[Byte, Array[Byte]],
mbs: Map[ByteString, Array[Byte]],
mso: Map[Option[String], Option[String]],
ml: Map[UUID, UUID],
mu: Map[UUID, UUID],
mlo: Map[Option[UUID], Option[UUID]]
)
28 changes: 28 additions & 0 deletions parquet/src/main/scala/magnolify/parquet/logical/TimeTypes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.parquet.logical

import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit

trait TimeTypes {
protected def unit: TimeUnit
protected def ts(adjusted: Boolean): LogicalTypeAnnotation =
LogicalTypeAnnotation.timestampType(adjusted, unit)
protected def time(adjusted: Boolean): LogicalTypeAnnotation =
LogicalTypeAnnotation.timeType(adjusted, unit)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,13 @@ package magnolify.parquet
import java.time._

import magnolify.parquet.ParquetField.Primitive
import org.apache.parquet.schema.LogicalTypeAnnotation
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit

package object logical {
import magnolify.shared.Time._
// TIME (millis i32, micros i64, nanos, i64), UTC true/false
// TIMESTAMP (millis, micros, nanos), UTC true/false

private trait TimeTypes {
protected def unit: TimeUnit
protected def ts(adjusted: Boolean): LogicalTypeAnnotation =
LogicalTypeAnnotation.timestampType(adjusted, unit)
protected def time(adjusted: Boolean): LogicalTypeAnnotation =
LogicalTypeAnnotation.timeType(adjusted, unit)
}

object millis extends TimeTypes {
protected val unit = TimeUnit.MILLIS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,17 @@ class ParquetTypeSuite extends MagnolifySuite {

test[Logical]

{
property("Millis") {
import magnolify.parquet.logical.millis._
test[Time]
}

{
property("Micros") {
import magnolify.parquet.logical.micros._
test[Time]
}

{
property("Nanos") {
import magnolify.parquet.logical.nanos._
test[Time]
}
Expand Down
16 changes: 8 additions & 8 deletions shared/src/main/scala/magnolify/shared/Time.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object Time {
Instant.ofEpochMilli(millisFromEpoch)
@inline def millisFromInstant(instant: Instant): Long = instant.toEpochMilli
@inline def millisToJodaInstant(millisFromEpoch: Long): joda.Instant =
new joda.Instant(millisFromEpoch)
joda.Instant.ofEpochMilli(millisFromEpoch)
@inline def millisFromJodaInstant(instant: joda.Instant): Long = instant.getMillis

@inline def millisToLocalTime(millisFromMidnight: Int): LocalTime =
Expand All @@ -39,7 +39,7 @@ object Time {

@inline def millisToDuration(millis: Long): Duration = Duration.ofMillis(millis)
@inline def millisFromDuration(d: Duration): Long =
TimeUnit.SECONDS.toMillis(d.getSeconds) + TimeUnit.NANOSECONDS.toMillis(d.getNano)
TimeUnit.SECONDS.toMillis(d.getSeconds) + TimeUnit.NANOSECONDS.toMillis(d.getNano.toLong)
@inline def millisToJodaDuration(millis: Long): joda.Duration = joda.Duration.millis(millis)
@inline def millisFromJodaDuration(d: joda.Duration): Long = d.getMillis

Expand All @@ -62,7 +62,7 @@ object Time {
}
}
@inline def microsToJodaInstant(microsFromEpoch: Long): joda.Instant =
new joda.Instant(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch), joda.DateTimeZone.UTC)
joda.Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(microsFromEpoch))
@inline def microsFromJodaInstant(instant: joda.Instant): Long =
TimeUnit.MILLISECONDS.toMicros(instant.getMillis)

Expand All @@ -78,7 +78,7 @@ object Time {
@inline def microsToJodaLocalTime(microsFromMidnight: Long): joda.LocalTime =
joda.LocalTime.fromMillisOfDay(TimeUnit.MICROSECONDS.toMillis(microsFromMidnight))
@inline def microsFromJodaLocalTime(lt: joda.LocalTime): Long =
TimeUnit.MILLISECONDS.toMicros(lt.millisOfDay().get())
TimeUnit.MILLISECONDS.toMicros(lt.millisOfDay().get().toLong)

@inline def microsToLocalDateTime(microsFromEpoch: Long): LocalDateTime =
LocalDateTime.ofInstant(microsToInstant(microsFromEpoch), ZoneOffset.UTC)
Expand All @@ -88,7 +88,7 @@ object Time {
@inline def microsToDuration(micros: Long): Duration =
Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(micros))
@inline def microsFromDuration(d: Duration): Long =
TimeUnit.SECONDS.toMicros(d.getSeconds) + TimeUnit.NANOSECONDS.toMicros(d.getNano)
TimeUnit.SECONDS.toMicros(d.getSeconds) + TimeUnit.NANOSECONDS.toMicros(d.getNano.toLong)
@inline def microsToJodaDuration(micros: Long): joda.Duration =
joda.Duration.millis(TimeUnit.MICROSECONDS.toMillis(micros))
@inline def microsFromJodaDuration(d: joda.Duration): Long =
Expand All @@ -99,9 +99,9 @@ object Time {
@inline def nanosToInstant(epochNanos: Long): Instant =
Instant.ofEpochSecond(TimeUnit.NANOSECONDS.toSeconds(epochNanos), epochNanos % 1_000_000_000L)
@inline def nanosFromInstant(instant: Instant): Long =
TimeUnit.MILLISECONDS.toNanos(instant.toEpochMilli) + instant.getNano
TimeUnit.SECONDS.toNanos(instant.getEpochSecond) + instant.getNano
@inline def nanosToJodaInstant(nanosFromEpoch: Long): joda.Instant =
new joda.Instant(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch), joda.DateTimeZone.UTC)
joda.Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(nanosFromEpoch))
@inline def nanosFromJodaInstant(instant: joda.Instant): Long =
TimeUnit.MILLISECONDS.toNanos(instant.getMillis)

Expand All @@ -116,7 +116,7 @@ object Time {
@inline def nanosToJodaLocalTime(nanosFromMidnight: Long): joda.LocalTime =
joda.LocalTime.fromMillisOfDay(TimeUnit.NANOSECONDS.toMillis(nanosFromMidnight))
@inline def nanosFromJodaLocalTime(lt: joda.LocalTime): Long =
TimeUnit.MILLISECONDS.toNanos(lt.millisOfDay().get())
TimeUnit.MILLISECONDS.toNanos(lt.millisOfDay().get().toLong)

@inline def nanosToLocalDateTime(nanosFromEpoch: Long): LocalDateTime =
LocalDateTime.ofInstant(nanosToInstant(nanosFromEpoch), ZoneOffset.UTC)
Expand Down

0 comments on commit a3d90bf

Please sign in to comment.