Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen committed Aug 22, 2024
1 parent 2556186 commit 1a5ecbb
Show file tree
Hide file tree
Showing 11 changed files with 793 additions and 117 deletions.
2 changes: 1 addition & 1 deletion avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ object AvroField {
override def from(v: String)(cm: CaseMapper): String = v
override def to(v: String)(cm: CaseMapper): String = v
}

f
implicit def afEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): AvroField[T] =
// Avro 1.9+ added a type parameter for `GenericEnumSymbol`, breaking 1.8 compatibility
// Some reader, i.e. `AvroParquetReader` reads enums as `Utf8`
Expand Down
97 changes: 26 additions & 71 deletions avro/src/main/scala/magnolify/avro/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,106 +25,61 @@ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.util.concurrent.TimeUnit

package object logical {
import magnolify.shared.Time._
// Duplicate implementation from org.apache.avro.data.TimeConversions
// to support both 1.8 (joda-time based) and 1.9+ (java-time based)
object micros {
private def toTimestampMicros(microsFromEpoch: Long): Instant = {
val epochSeconds = microsFromEpoch / 1000000L
val nanoAdjustment = (microsFromEpoch % 1000000L) * 1000L;
Instant.ofEpochSecond(epochSeconds, nanoAdjustment)
}

private def fromTimestampMicros(instant: Instant): Long = {
val seconds = instant.getEpochSecond
val nanos = instant.getNano
if (seconds < 0 && nanos > 0) {
val micros = Math.multiplyExact(seconds + 1, 1000000L)
val adjustment = (nanos / 1000L) - 1000000
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(seconds, 1000000L)
Math.addExact(micros, nanos / 1000L)
}
}

implicit val afTimestampMicros: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(toTimestampMicros)(
fromTimestampMicros
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToInstant)(
microsFromInstant
)

implicit val afTimeMicros: AvroField[LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { us =>
LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(us))
} { time =>
TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay)
}
AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToLocalTime)(microsFromLocalTime)

// `LogicalTypes.localTimestampMicros()` is Avro 1.10
implicit val afLocalTimestampMicros: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros")) { microsFromEpoch =>
val instant = toTimestampMicros(microsFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
fromTimestampMicros(instant)
}
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros"))(microsToLocalDateTime)(
microsFromLocalDateTime
)

// avro 1.8 uses joda-time
implicit val afJodaTimestampMicros: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros()) { microsFromEpoch =>
new joda.DateTime(microsFromEpoch / 1000, joda.DateTimeZone.UTC)
} { timestamp =>
1000 * timestamp.getMillis
}
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(microsToJodaDateTime)(
microsFromJodaDateTime
)

implicit val afJodaTimeMicros: AvroField[joda.LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { microsFromMidnight =>
joda.LocalTime.fromMillisOfDay(microsFromMidnight / 1000)
} { time =>
// from LossyTimeMicrosConversion
1000L * time.millisOfDay().get()
}
AvroField.logicalType[Long](LogicalTypes.timeMicros())(microsToJodaLocalTime)(
microsFromJodaLocalTime
)
}

object millis {
implicit val afTimestampMillis: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
Instant.ofEpochMilli(millisFromEpoch)
} { timestamp =>
timestamp.toEpochMilli
}
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToInstant)(
millisFromInstant
)

implicit val afTimeMillis: AvroField[LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong))
} { time =>
TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay).toInt
}
AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToLocalTime)(millisFromLocalTime)

// `LogicalTypes.localTimestampMillis` is Avro 1.10.0+
implicit val afLocalTimestampMillis: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis")) { millisFromEpoch =>
val instant = Instant.ofEpochMilli(millisFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
instant.toEpochMilli
}
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis"))(millisToLocalDateTime)(
millisFromLocalDateTime
)

// avro 1.8 uses joda-time
implicit val afJodaTimestampMillis: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC)
} { timestamp =>
timestamp.getMillis
}
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(millisToJodaDateTime)(
millisFromJodaDateTime
)

implicit val afJodaTimeMillis: AvroField[joda.LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong)
} { time =>
time.millisOfDay().get()
}
AvroField.logicalType[Int](LogicalTypes.timeMillis())(millisToJodaLocalTime)(
millisFromJodaLocalTime
)
}

object bigquery {
Expand Down
Loading

0 comments on commit 1a5ecbb

Please sign in to comment.