diff --git a/build.sbt b/build.sbt index ccb728c14f..84c435a1cc 100644 --- a/build.sbt +++ b/build.sbt @@ -428,6 +428,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( // added BQ Json object ProblemFilters.exclude[MissingTypesProblem]( "com.spotify.scio.bigquery.types.package$Json$" + ), + // dropped custom BigQueryAvroUtilsWrapper + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtilsWrapper" ) ) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 8c53c34b34..aa04fa5744 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -48,7 +48,8 @@ object TypedBigQueryIT { timestamp: Instant, date: LocalDate, time: LocalTime, - datetime: LocalDateTime, + // BQ DATETIME is problematic with avro: export as 'string(datetime)', load as '(long)local-timestamp-micros' + // datetime: LocalDateTime, geography: Geography, json: Json, bigNumeric: BigNumeric @@ -116,20 +117,32 @@ object TypedBigQueryIT { class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { import TypedBigQueryIT._ + private val bq = BigQuery.defaultInstance() + override protected def afterAll(): Unit = { - val bq = BigQuery.defaultInstance() // best effort cleanup Try(bq.tables.delete(typedTable.ref)) Try(bq.tables.delete(tableRowTable.ref)) Try(bq.tables.delete(avroTable.ref)) } + def waitForTable(table: Table.Spec): Unit = { + var retries = 0 + while (!bq.tables.exists(table.ref) && retries < 3) { + Thread.sleep(500) + retries += 1 + } + if (retries >= 3) throw new RuntimeException(s"Table $table not found") + } + "TypedBigQuery" should "handle records as TableRow" in { runWithRealContext(options) { sc => sc.parallelize(records) .saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED) }.waitUntilFinish() + waitForTable(typedTable) + runWithRealContext(options) { sc => val data = sc.typedBigQuery[Record](typedTable) data should containInAnyOrder(records) @@ -147,15 +160,17 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(tableRowTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow) data should containInAnyOrder(records) } } - // TODO fix if in beam 2.61 - ignore should "handle records as avro format" in { + it should "handle records as avro format" in { implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema) + runWithRealContext(options) { sc => sc.parallelize(records) .map(Record.toAvro) @@ -166,6 +181,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(avroTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro) data should containInAnyOrder(records) diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java deleted file mode 100644 index a1af5d9c08..0000000000 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2019 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import java.util.ArrayList; -import java.util.List; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericRecord; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -/** - * A set of utilities for working with Avro files. - * - *

These utilities are based on the Avro - * 1.8.1 specification. - */ -public class BigQueryAvroUtilsWrapper { - - /** - * Defines the valid mapping between BigQuery types and native Avro types. - * - *

Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. - */ - static final ImmutableMap BIG_QUERY_TO_AVRO_TYPES = - ImmutableMap.builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("JSON", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.LONG) - .build(); - - public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(record, schema); - } - - public static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) { - List avroFields = new ArrayList<>(); - for (TableFieldSchema bigQueryField : fieldSchemas) { - avroFields.add(convertField(bigQueryField)); - } - return Schema.createRecord( - schemaName, - "Translated Avro Schema for " + schemaName, - "org.apache.beam.sdk.io.gcp.bigquery", - false, - avroFields); - } - - private static Field convertField(TableFieldSchema bigQueryField) { - Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); - if (avroType == null) { - throw new IllegalArgumentException( - "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); - } - - Schema elementSchema; - if (avroType == Type.RECORD) { - elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields()); - } else { - elementSchema = Schema.create(avroType); - if (bigQueryField.getType().equals("DATE")) { - elementSchema = LogicalTypes.date().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("TIME")) { - elementSchema = LogicalTypes.timeMicros().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("DATETIME") && avroType != Type.STRING) { - throw new IllegalArgumentException("DateTime type is not supported"); - } - } - Schema fieldSchema; - if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema); - } else if ("REQUIRED".equals(bigQueryField.getMode())) { - fieldSchema = elementSchema; - } else if ("REPEATED".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createArray(elementSchema); - } else { - throw new IllegalArgumentException( - String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode())); - } - return new Field( - bigQueryField.getName(), - fieldSchema, - bigQueryField.getDescription(), - (Object) null /* Cast to avoid deprecated JsonNode constructor. */); - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..0498e9a76d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -385,7 +385,7 @@ object BigQueryTypedTable { beam.BigQueryIO.readTableRows(), beam.BigQueryIO.writeTableRows(), table, - BigQueryUtils.convertGenericRecordToTableRow(_, _) + (r, _) => BigQueryUtils.convertGenericRecordToTableRow(r) )(coders.tableRowCoder) private[this] def genericRecord( @@ -423,8 +423,8 @@ object BigQueryTypedTable { val writer = beam.BigQueryIO .write[T]() .withFormatFunction(Functions.serializableFn(wFn)) - val fn: (GenericRecord, TableSchema) => T = (gr, ts) => - tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) + val fn: (GenericRecord, TableSchema) => T = (gr, _) => + tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr)) BigQueryTypedTable(reader, writer, table, fn) } @@ -437,13 +437,15 @@ object BigQueryTypedTable { ): BigQueryTypedTable[T] = { val rFn = ClosureCleaner.clean(readerFn) val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) + val reader = beam.BigQueryIO + .read(rFn(_)) + .useAvroLogicalTypes() val writer = beam.BigQueryIO .write[T]() .useAvroLogicalTypes() .withAvroFormatFunction(input => wFn(input.getElement())) .withAvroSchemaFactory { ts => - BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields()) + BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true) } BigQueryTypedTable(reader, writer, table, fn) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 47818e3bba..43f2ef748d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -176,10 +176,13 @@ object Timestamp { case t: Long => new Instant(t / 1000) case _ => parse(timestamp.toString) } + + def micros(timestamp: Instant): Long = timestamp.getMillis * 1000 } /** Utility for BigQuery `DATE` type. */ object Date { + private val EpochDate = new LocalDate(1970, 1, 1) // YYYY-[M]M-[D]D private[this] val Formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC() @@ -195,6 +198,8 @@ object Date { case d: Int => new LocalDate(0, DateTimeZone.UTC).plusDays(d) case _ => parse(date.toString) } + + def days(date: LocalDate): Int = Days.daysBetween(EpochDate, date).getDays } /** Utility for BigQuery `TIME` type. */ @@ -219,6 +224,8 @@ object Time { case t: Long => new LocalTime(t / 1000, DateTimeZone.UTC) case _ => parse(time.toString) } + + def micros(time: LocalTime): Long = time.millisOfDay().get().toLong * 1000 } /** Utility for BigQuery `DATETIME` type. */ @@ -324,4 +331,7 @@ object Numeric { case b: ByteBuffer => DecimalConverter.fromBytes(b, null, DecimalLogicalType) case _ => apply(value.toString) } + + def bytes(value: BigDecimal): ByteBuffer = + DecimalConverter.toBytes(value.bigDecimal, null, DecimalLogicalType) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala index 5470947a42..380484283c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala @@ -29,7 +29,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} -import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryAvroUtilsWrapper, BigQueryOptions} +import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryOptions, BigQueryUtils} import org.apache.beam.sdk.io.gcp.{bigquery => bq} import org.apache.beam.sdk.options.{ExecutorOptions, PipelineOptionsFactory} import org.joda.time.Instant @@ -64,11 +64,8 @@ final private[client] class TableOps(client: Client) { storageAvroRows(table, TableReadOptions.getDefaultInstance) def storageRows(table: STable, readOptions: TableReadOptions): Iterator[TableRow] = - withBigQueryService { bqServices => - val tb = bqServices.getTable(table.ref, readOptions.getSelectedFieldsList) - storageAvroRows(table, readOptions).map { gr => - BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow(gr, tb.getSchema) - } + storageAvroRows(table, readOptions).map { gr => + BigQueryUtils.convertGenericRecordToTableRow(gr) } def storageAvroRows(table: STable, readOptions: TableReadOptions): Iterator[GenericRecord] = { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 4f00d823ba..2a3ce84045 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -180,17 +180,18 @@ private[types] object ConverterProvider { case t if t =:= typeOf[String] => tree case t if t =:= typeOf[BigDecimal] => - q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString" + q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)" case t if t =:= typeOf[ByteString] => q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)" case t if t =:= typeOf[Array[Byte]] => q"_root_.java.nio.ByteBuffer.wrap($tree)" - case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000" + case t if t =:= typeOf[Instant] => + q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)" case t if t =:= typeOf[LocalDate] => - q"_root_.com.spotify.scio.bigquery.Date($tree)" + q"_root_.com.spotify.scio.bigquery.Date.days($tree)" case t if t =:= typeOf[LocalTime] => - q"_root_.com.spotify.scio.bigquery.Time($tree)" + q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime($tree)" @@ -200,7 +201,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Json] => q"$tree.wkt" case t if t =:= typeOf[BigNumeric] => - q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString" + q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)" // nested records case t if isCaseClass(c)(t) => diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index b150b8a7c4..620e160731 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -37,7 +37,7 @@ private[types] object SchemaProvider { def avroSchemaOf[T: TypeTag]: Schema = AvroSchemaCache.get( typeTag[T].tpe.toString, - BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields) + BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true) ) def schemaOf[T: TypeTag]: TableSchema = diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 0dda1dbb25..23e8d0d5dd 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -110,5 +110,8 @@ package object types { case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType)) case _ => apply(value.toString) } + + def bytes(value: BigNumeric): ByteBuffer = + DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType) } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 797837c052..10ece77755 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -38,6 +38,11 @@ final class ConverterProviderSpec import Schemas._ + def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary { + val max = BigInt(10).pow(precision) - 1 + Gen.choose(-max, max).map(BigDecimal(_, scale)) + } + implicit val arbByteArray: Arbitrary[Array[Byte]] = Arbitrary(Gen.alphaStr.map(_.getBytes)) implicit val arbByteString: Arbitrary[ByteString] = Arbitrary( Gen.alphaStr.map(ByteString.copyFromUtf8) @@ -46,17 +51,26 @@ final class ConverterProviderSpec implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(Gen.const(LocalDate.now())) implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(Gen.const(LocalTime.now())) implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(Gen.const(LocalDateTime.now())) - implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = Arbitrary { - Arbitrary.arbBigDecimal.arbitrary - .retryUntil(_.precision <= Numeric.MaxNumericPrecision) - .map(Numeric.apply) - } + implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = + arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale) + implicit val arbGeography: Arbitrary[Geography] = Arbitrary( + for { + x <- Gen.numChar + y <- Gen.numChar + } yield Geography(s"POINT($x $y)") + ) implicit val arbJson: Arbitrary[Json] = Arbitrary( for { key <- Gen.alphaStr value <- Gen.alphaStr } yield Json(s"""{"$key":"$value"}""") ) + implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary { + // Precision: 76.76 (the 77th digit is partial) + arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary + .map(BigNumeric.apply) + } + implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList) implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _) implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _) @@ -111,6 +125,7 @@ final class ConverterProviderSpec o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF") o.geographyF.isDefined shouldBe r.containsKey("geographyF") o.jsonF.isDefined shouldBe r.containsKey("jsonF") + o.bigNumericF.isDefined shouldBe r.containsKey("bigNumericF") } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index 907a2d5610..1bde9bfba9 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -42,7 +42,8 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { | {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"}, | {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"}, | {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"}, - | {"mode": "$mode", "name": "jsonF", "type": "JSON"} + | {"mode": "$mode", "name": "jsonF", "type": "JSON"}, + | {"mode": "$mode", "name": "bigNumericF", "type": "BIGNUMERIC"} |] |""".stripMargin diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala index 754e80ca3f..8d04808c6f 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala @@ -37,7 +37,8 @@ object Schemas { datetimeF: LocalDateTime, bigDecimalF: BigDecimal, geographyF: Geography, - jsonF: Json + jsonF: Json, + bigNumericF: BigNumeric ) case class Optional( boolF: Option[Boolean], @@ -54,7 +55,8 @@ object Schemas { datetimeF: Option[LocalDateTime], bigDecimalF: Option[BigDecimal], geographyF: Option[Geography], - jsonF: Option[Json] + jsonF: Option[Json], + bigNumericF: Option[BigNumeric] ) case class Repeated( boolF: List[Boolean], @@ -71,7 +73,8 @@ object Schemas { datetimeF: List[LocalDateTime], bigDecimalF: List[BigDecimal], geographyF: List[Geography], - jsonF: List[Json] + jsonF: List[Json], + bigNumericF: List[BigNumeric] ) // records