Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#194: Fixed timezone parameter orc format #201

Merged
merged 9 commits into from
Apr 1, 2022
2 changes: 1 addition & 1 deletion src/main/scala/com/exasol/cloudetl/source/OrcSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final case class OrcSource(
}

private[this] def createReader(): Reader = {
val options = OrcFile.readerOptions(conf).filesystem(fileSystem)
val options = OrcFile.readerOptions(conf).filesystem(fileSystem).useUTCTimestamp(true)
try {
OrcFile.createReader(path, options)
} catch {
Expand Down
16 changes: 11 additions & 5 deletions src/test/scala/com/exasol/cloudetl/it/orc/OrcDataImporterIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.exasol.cloudetl.orc

import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.time._

import com.exasol.cloudetl.BaseDataImporter
import com.exasol.matcher.CellMatcherFactory
Expand Down Expand Up @@ -145,14 +146,19 @@ class OrcDataImporterIT extends BaseDataImporter {
}

test("imports timestamp") {
val timestamp1 = Timestamp.from(java.time.Instant.EPOCH)
val timestamp2 = new Timestamp(System.currentTimeMillis())
val millis1 = Instant.EPOCH.toEpochMilli()
val millis2 = System.currentTimeMillis()
val zdt1 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis1), ZoneId.of("Europe/Berlin"))
val zdt2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis2), ZoneId.of("Europe/Berlin"))
val expectedTimestamp1 = Timestamp.valueOf(zdt1.toLocalDateTime())
val expectedTimestamp2 = Timestamp.valueOf(zdt2.toLocalDateTime())

OrcChecker("struct<f:timestamp>", "TIMESTAMP", "timestamp_table")
.withInputValues(List(timestamp1, timestamp2, null))
.withInputValues(List(new Timestamp(millis1), new Timestamp(millis2), null))
.assertResultSet(
table()
.row(timestamp1)
.row(timestamp2)
.row(expectedTimestamp1)
.row(expectedTimestamp2)
.row(null)
.matches()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import com.exasol.cloudetl.BaseS3IntegrationTest
import com.exasol.cloudetl.TestFileManager
import com.exasol.cloudetl.avro.AvroTestDataWriter
import com.exasol.cloudetl.helper.DateTimeConverter._
import com.exasol.cloudetl.orc.OrcTestDataWriter
import com.exasol.cloudetl.parquet.ParquetTestDataWriter
import com.exasol.dbbuilder.dialects.Table
import com.exasol.matcher.ResultSetStructureMatcher.table

import org.apache.avro.Schema
import org.apache.hadoop.fs.{Path => HPath}
import org.apache.orc.TypeDescription
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.MessageTypeParser
import org.hamcrest.Matcher
Expand Down Expand Up @@ -181,6 +183,23 @@ class TimestampWithUTCImportExportIT extends BaseS3IntegrationTest with BeforeAn
)
}

test("orc imports timestamp") {
val timestamp1 = new Timestamp(Instant.EPOCH.toEpochMilli())
val timestamp2 = new Timestamp(System.currentTimeMillis())
OrcTimestampWriter("struct<f:timestamp>")
.withBucketName("orc-timestamp")
.withTableColumnType("TIMESTAMP")
.withInputValues(List(timestamp1, timestamp2, null))
.verify(
table()
.row(timestamp1)
.row(timestamp2)
.row(null)
.withUtcCalendar()
.matches()
)
}

trait BaseTimestampWriter {
private var bucketName: String = _
private var table: Table = _
Expand Down Expand Up @@ -239,4 +258,14 @@ class TimestampWithUTCImportExportIT extends BaseS3IntegrationTest with BeforeAn
}
}

case class OrcTimestampWriter(orcType: String) extends BaseTimestampWriter with OrcTestDataWriter {
private val orcSchema = TypeDescription.fromString(orcType)
override val dataFormat = "ORC"
override val filePath = path
def withInputValues[T](values: List[T]): this.type = {
writeDataValues(values, this.filePath, this.orcSchema)
this
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait OrcTestDataWriter {
val conf = new Configuration()
conf.set("orc.stripe.size", s"$ORC_STRIPE_SIZE")
conf.set("orc.block.size", s"$ORC_BLOCK_SIZE")
OrcFile.createWriter(path, OrcFile.writerOptions(conf).setSchema(schema))
OrcFile.createWriter(path, OrcFile.writerOptions(conf).setSchema(schema).useUTCTimestamp(true))
}

final def writeDataValues[T](values: List[T], path: HPath, schema: TypeDescription): Unit = {
Expand Down