From 98e3fafa7a40e59f2ec6ff11b485d6191d0b6b55 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 27 Nov 2024 11:05:00 +0100 Subject: [PATCH 1/7] Improve BQ typed support --- build.sbt | 4 + .../scio/bigquery/TypedBigQueryIT.scala | 25 +++- .../bigquery/BigQueryAvroUtilsWrapper.java | 119 ------------------ .../spotify/scio/bigquery/BigQueryIO.scala | 12 +- .../spotify/scio/bigquery/BigQueryTypes.scala | 10 ++ .../scio/bigquery/client/TableOps.scala | 9 +- .../bigquery/types/ConverterProvider.scala | 11 +- .../scio/bigquery/types/SchemaProvider.scala | 2 +- .../spotify/scio/bigquery/types/package.scala | 3 + .../types/ConverterProviderSpec.scala | 25 +++- .../bigquery/types/SchemaProviderTest.scala | 3 +- .../spotify/scio/bigquery/types/Schemas.scala | 9 +- 12 files changed, 83 insertions(+), 149 deletions(-) delete mode 100644 scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java diff --git a/build.sbt b/build.sbt index ccb728c14f..84c435a1cc 100644 --- a/build.sbt +++ b/build.sbt @@ -428,6 +428,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( // added BQ Json object ProblemFilters.exclude[MissingTypesProblem]( "com.spotify.scio.bigquery.types.package$Json$" + ), + // dropped custom BigQueryAvroUtilsWrapper + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtilsWrapper" ) ) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 8c53c34b34..aa04fa5744 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -48,7 +48,8 @@ object TypedBigQueryIT { timestamp: Instant, date: LocalDate, time: LocalTime, - datetime: LocalDateTime, + // BQ DATETIME is problematic with avro: export as 'string(datetime)', load as '(long)local-timestamp-micros' + // datetime: LocalDateTime, geography: Geography, json: Json, bigNumeric: BigNumeric @@ -116,20 +117,32 @@ object TypedBigQueryIT { class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { import TypedBigQueryIT._ + private val bq = BigQuery.defaultInstance() + override protected def afterAll(): Unit = { - val bq = BigQuery.defaultInstance() // best effort cleanup Try(bq.tables.delete(typedTable.ref)) Try(bq.tables.delete(tableRowTable.ref)) Try(bq.tables.delete(avroTable.ref)) } + def waitForTable(table: Table.Spec): Unit = { + var retries = 0 + while (!bq.tables.exists(table.ref) && retries < 3) { + Thread.sleep(500) + retries += 1 + } + if (retries >= 3) throw new RuntimeException(s"Table $table not found") + } + "TypedBigQuery" should "handle records as TableRow" in { runWithRealContext(options) { sc => sc.parallelize(records) .saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED) }.waitUntilFinish() + waitForTable(typedTable) + runWithRealContext(options) { sc => val data = sc.typedBigQuery[Record](typedTable) data should containInAnyOrder(records) @@ -147,15 +160,17 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(tableRowTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow) data should containInAnyOrder(records) } } - // TODO fix if in beam 2.61 - ignore should "handle records as avro format" in { + it should "handle records as avro format" in { implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema) + runWithRealContext(options) { sc => sc.parallelize(records) .map(Record.toAvro) @@ -166,6 +181,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(avroTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro) data should containInAnyOrder(records) diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java deleted file mode 100644 index a1af5d9c08..0000000000 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2019 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import java.util.ArrayList; -import java.util.List; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericRecord; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -/** - * A set of utilities for working with Avro files. - * - *

These utilities are based on the Avro - * 1.8.1 specification. - */ -public class BigQueryAvroUtilsWrapper { - - /** - * Defines the valid mapping between BigQuery types and native Avro types. - * - *

Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. - */ - static final ImmutableMap BIG_QUERY_TO_AVRO_TYPES = - ImmutableMap.builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("JSON", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.LONG) - .build(); - - public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(record, schema); - } - - public static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) { - List avroFields = new ArrayList<>(); - for (TableFieldSchema bigQueryField : fieldSchemas) { - avroFields.add(convertField(bigQueryField)); - } - return Schema.createRecord( - schemaName, - "Translated Avro Schema for " + schemaName, - "org.apache.beam.sdk.io.gcp.bigquery", - false, - avroFields); - } - - private static Field convertField(TableFieldSchema bigQueryField) { - Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); - if (avroType == null) { - throw new IllegalArgumentException( - "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); - } - - Schema elementSchema; - if (avroType == Type.RECORD) { - elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields()); - } else { - elementSchema = Schema.create(avroType); - if (bigQueryField.getType().equals("DATE")) { - elementSchema = LogicalTypes.date().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("TIME")) { - elementSchema = LogicalTypes.timeMicros().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("DATETIME") && avroType != Type.STRING) { - throw new IllegalArgumentException("DateTime type is not supported"); - } - } - Schema fieldSchema; - if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema); - } else if ("REQUIRED".equals(bigQueryField.getMode())) { - fieldSchema = elementSchema; - } else if ("REPEATED".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createArray(elementSchema); - } else { - throw new IllegalArgumentException( - String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode())); - } - return new Field( - bigQueryField.getName(), - fieldSchema, - bigQueryField.getDescription(), - (Object) null /* Cast to avoid deprecated JsonNode constructor. */); - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..0498e9a76d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -385,7 +385,7 @@ object BigQueryTypedTable { beam.BigQueryIO.readTableRows(), beam.BigQueryIO.writeTableRows(), table, - BigQueryUtils.convertGenericRecordToTableRow(_, _) + (r, _) => BigQueryUtils.convertGenericRecordToTableRow(r) )(coders.tableRowCoder) private[this] def genericRecord( @@ -423,8 +423,8 @@ object BigQueryTypedTable { val writer = beam.BigQueryIO .write[T]() .withFormatFunction(Functions.serializableFn(wFn)) - val fn: (GenericRecord, TableSchema) => T = (gr, ts) => - tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) + val fn: (GenericRecord, TableSchema) => T = (gr, _) => + tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr)) BigQueryTypedTable(reader, writer, table, fn) } @@ -437,13 +437,15 @@ object BigQueryTypedTable { ): BigQueryTypedTable[T] = { val rFn = ClosureCleaner.clean(readerFn) val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) + val reader = beam.BigQueryIO + .read(rFn(_)) + .useAvroLogicalTypes() val writer = beam.BigQueryIO .write[T]() .useAvroLogicalTypes() .withAvroFormatFunction(input => wFn(input.getElement())) .withAvroSchemaFactory { ts => - BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields()) + BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true) } BigQueryTypedTable(reader, writer, table, fn) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 47818e3bba..43f2ef748d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -176,10 +176,13 @@ object Timestamp { case t: Long => new Instant(t / 1000) case _ => parse(timestamp.toString) } + + def micros(timestamp: Instant): Long = timestamp.getMillis * 1000 } /** Utility for BigQuery `DATE` type. */ object Date { + private val EpochDate = new LocalDate(1970, 1, 1) // YYYY-[M]M-[D]D private[this] val Formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC() @@ -195,6 +198,8 @@ object Date { case d: Int => new LocalDate(0, DateTimeZone.UTC).plusDays(d) case _ => parse(date.toString) } + + def days(date: LocalDate): Int = Days.daysBetween(EpochDate, date).getDays } /** Utility for BigQuery `TIME` type. */ @@ -219,6 +224,8 @@ object Time { case t: Long => new LocalTime(t / 1000, DateTimeZone.UTC) case _ => parse(time.toString) } + + def micros(time: LocalTime): Long = time.millisOfDay().get().toLong * 1000 } /** Utility for BigQuery `DATETIME` type. */ @@ -324,4 +331,7 @@ object Numeric { case b: ByteBuffer => DecimalConverter.fromBytes(b, null, DecimalLogicalType) case _ => apply(value.toString) } + + def bytes(value: BigDecimal): ByteBuffer = + DecimalConverter.toBytes(value.bigDecimal, null, DecimalLogicalType) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala index 5470947a42..380484283c 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala @@ -29,7 +29,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} -import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryAvroUtilsWrapper, BigQueryOptions} +import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryOptions, BigQueryUtils} import org.apache.beam.sdk.io.gcp.{bigquery => bq} import org.apache.beam.sdk.options.{ExecutorOptions, PipelineOptionsFactory} import org.joda.time.Instant @@ -64,11 +64,8 @@ final private[client] class TableOps(client: Client) { storageAvroRows(table, TableReadOptions.getDefaultInstance) def storageRows(table: STable, readOptions: TableReadOptions): Iterator[TableRow] = - withBigQueryService { bqServices => - val tb = bqServices.getTable(table.ref, readOptions.getSelectedFieldsList) - storageAvroRows(table, readOptions).map { gr => - BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow(gr, tb.getSchema) - } + storageAvroRows(table, readOptions).map { gr => + BigQueryUtils.convertGenericRecordToTableRow(gr) } def storageAvroRows(table: STable, readOptions: TableReadOptions): Iterator[GenericRecord] = { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 4f00d823ba..2a3ce84045 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -180,17 +180,18 @@ private[types] object ConverterProvider { case t if t =:= typeOf[String] => tree case t if t =:= typeOf[BigDecimal] => - q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString" + q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)" case t if t =:= typeOf[ByteString] => q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)" case t if t =:= typeOf[Array[Byte]] => q"_root_.java.nio.ByteBuffer.wrap($tree)" - case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000" + case t if t =:= typeOf[Instant] => + q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)" case t if t =:= typeOf[LocalDate] => - q"_root_.com.spotify.scio.bigquery.Date($tree)" + q"_root_.com.spotify.scio.bigquery.Date.days($tree)" case t if t =:= typeOf[LocalTime] => - q"_root_.com.spotify.scio.bigquery.Time($tree)" + q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime($tree)" @@ -200,7 +201,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Json] => q"$tree.wkt" case t if t =:= typeOf[BigNumeric] => - q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString" + q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)" // nested records case t if isCaseClass(c)(t) => diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index b150b8a7c4..620e160731 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -37,7 +37,7 @@ private[types] object SchemaProvider { def avroSchemaOf[T: TypeTag]: Schema = AvroSchemaCache.get( typeTag[T].tpe.toString, - BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields) + BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true) ) def schemaOf[T: TypeTag]: TableSchema = diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 0dda1dbb25..23e8d0d5dd 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -110,5 +110,8 @@ package object types { case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType)) case _ => apply(value.toString) } + + def bytes(value: BigNumeric): ByteBuffer = + DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType) } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 797837c052..10ece77755 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -38,6 +38,11 @@ final class ConverterProviderSpec import Schemas._ + def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary { + val max = BigInt(10).pow(precision) - 1 + Gen.choose(-max, max).map(BigDecimal(_, scale)) + } + implicit val arbByteArray: Arbitrary[Array[Byte]] = Arbitrary(Gen.alphaStr.map(_.getBytes)) implicit val arbByteString: Arbitrary[ByteString] = Arbitrary( Gen.alphaStr.map(ByteString.copyFromUtf8) @@ -46,17 +51,26 @@ final class ConverterProviderSpec implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(Gen.const(LocalDate.now())) implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(Gen.const(LocalTime.now())) implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(Gen.const(LocalDateTime.now())) - implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = Arbitrary { - Arbitrary.arbBigDecimal.arbitrary - .retryUntil(_.precision <= Numeric.MaxNumericPrecision) - .map(Numeric.apply) - } + implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = + arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale) + implicit val arbGeography: Arbitrary[Geography] = Arbitrary( + for { + x <- Gen.numChar + y <- Gen.numChar + } yield Geography(s"POINT($x $y)") + ) implicit val arbJson: Arbitrary[Json] = Arbitrary( for { key <- Gen.alphaStr value <- Gen.alphaStr } yield Json(s"""{"$key":"$value"}""") ) + implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary { + // Precision: 76.76 (the 77th digit is partial) + arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary + .map(BigNumeric.apply) + } + implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList) implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _) implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _) @@ -111,6 +125,7 @@ final class ConverterProviderSpec o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF") o.geographyF.isDefined shouldBe r.containsKey("geographyF") o.jsonF.isDefined shouldBe r.containsKey("jsonF") + o.bigNumericF.isDefined shouldBe r.containsKey("bigNumericF") } } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala index 907a2d5610..1bde9bfba9 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala @@ -42,7 +42,8 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers { | {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"}, | {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"}, | {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"}, - | {"mode": "$mode", "name": "jsonF", "type": "JSON"} + | {"mode": "$mode", "name": "jsonF", "type": "JSON"}, + | {"mode": "$mode", "name": "bigNumericF", "type": "BIGNUMERIC"} |] |""".stripMargin diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala index 754e80ca3f..8d04808c6f 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala @@ -37,7 +37,8 @@ object Schemas { datetimeF: LocalDateTime, bigDecimalF: BigDecimal, geographyF: Geography, - jsonF: Json + jsonF: Json, + bigNumericF: BigNumeric ) case class Optional( boolF: Option[Boolean], @@ -54,7 +55,8 @@ object Schemas { datetimeF: Option[LocalDateTime], bigDecimalF: Option[BigDecimal], geographyF: Option[Geography], - jsonF: Option[Json] + jsonF: Option[Json], + bigNumericF: Option[BigNumeric] ) case class Repeated( boolF: List[Boolean], @@ -71,7 +73,8 @@ object Schemas { datetimeF: List[LocalDateTime], bigDecimalF: List[BigDecimal], geographyF: List[Geography], - jsonF: List[Json] + jsonF: List[Json], + bigNumericF: List[BigNumeric] ) // records From 22908a85114d5d3e843e40d55a52d237eb0caa0b Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 28 Nov 2024 10:32:37 +0100 Subject: [PATCH 2/7] Stable behavior --- .../scio/bigquery/TypedBigQueryIT.scala | 3 +- .../spotify/scio/bigquery/BigQueryIO.scala | 66 +++++++++++++------ .../spotify/scio/bigquery/BigQueryTypes.scala | 3 + .../bigquery/syntax/SCollectionSyntax.scala | 2 +- .../bigquery/syntax/ScioContextSyntax.scala | 3 +- .../bigquery/types/ConverterProvider.scala | 4 +- .../scio/bigquery/types/TypeProvider.scala | 2 +- 7 files changed, 57 insertions(+), 26 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index aa04fa5744..a7d15091b5 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -184,7 +184,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { waitForTable(avroTable) runWithRealContext(options) { sc => - val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro) + val data = + sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro) data should containInAnyOrder(records) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 0498e9a76d..dc257cd06a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -328,7 +328,11 @@ object BigQueryTypedTable { /** Defines the format in which BigQuery can be read and written to. */ sealed abstract class Format[F] object Format { - case object GenericRecord extends Format[GenericRecord] + sealed abstract private[bigquery] class AvroFormat(val useLogicalTypes: Boolean) + extends Format[GenericRecord] + + case object GenericRecord extends AvroFormat(false) + case object GenericRecordWithLogicalTypes extends AvroFormat(true) case object TableRow extends Format[TableRow] } @@ -389,25 +393,31 @@ object BigQueryTypedTable { )(coders.tableRowCoder) private[this] def genericRecord( - table: Table + table: Table, + useLogicalTypes: Boolean )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = BigQueryTypedTable( - _.getRecord(), - identity[GenericRecord], - (genericRecord: GenericRecord, _: TableSchema) => genericRecord, - table + beam.BigQueryIO + .read(_.getRecord) + .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), + beam.BigQueryIO + .write[GenericRecord]() + .withAvroFormatFunction(_.getElement) + .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), + table, + (genericRecord: GenericRecord, _: TableSchema) => genericRecord ) /** * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. * * NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery - * types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE - * and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683 + * types DATE, TIME, DATEIME will be read as STRING. Use `Format.GenericRecordWithLogicalTypes` + * for avro `date`, `timestamp-micros` and `local-timestamp-micros` (avro 1.10+) */ def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] = format match { - case Format.GenericRecord => genericRecord(table) + case f: Format.AvroFormat => genericRecord(table, f.useLogicalTypes) case Format.TableRow => tableRow(table) } @@ -437,16 +447,11 @@ object BigQueryTypedTable { ): BigQueryTypedTable[T] = { val rFn = ClosureCleaner.clean(readerFn) val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO - .read(rFn(_)) - .useAvroLogicalTypes() + val reader = beam.BigQueryIO.read(rFn(_)) val writer = beam.BigQueryIO .write[T]() .useAvroLogicalTypes() .withAvroFormatFunction(input => wFn(input.getElement())) - .withAvroSchemaFactory { ts => - BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true) - } BigQueryTypedTable(reader, writer, table, fn) } @@ -740,12 +745,31 @@ object BigQueryTyped { override type ReadP = Unit override type WriteP = Table.WriteParam[T] - private val underlying = BigQueryTypedTable[T]( - (i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord), - BigQueryType[T].toTableRow, - BigQueryType[T].fromTableRow, - table - ) + private val underlying = { + val readFn = Functions.serializableFn[SchemaAndRecord, T] { x => + BigQueryType[T].fromAvro(x.getRecord) + } + val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x => + BigQueryType[T].toAvro(x.getElement) + } + val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ => + BigQueryType[T].avroSchema + } + val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r) + + BigQueryTypedTable[T]( + beam.BigQueryIO + .read(readFn) + .useAvroLogicalTypes(), + beam.BigQueryIO + .write[T]() + .withAvroFormatFunction(writeFn) + .withAvroSchemaFactory(schemaFactory) + .useAvroLogicalTypes(), + table, + parseFn + ) + } override def testId: String = s"BigQueryIO(${table.spec})" diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 43f2ef748d..bb3a43dd09 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -257,6 +257,9 @@ object DateTime { /** Convert BigQuery `DATETIME` string to `LocalDateTime`. */ def parse(datetime: String): LocalDateTime = Parser.parseLocalDateTime(datetime) + + // For BigQueryType macros only, do not use directly + def format(datetime: LocalDateTime): String = apply(datetime) } /** Scala wrapper for [[com.google.api.services.bigquery.model.TimePartitioning]]. */ diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala index ff2a6ee079..2e7cd64b4e 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala @@ -203,7 +203,7 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC self .covary[GenericRecord] .write( - BigQueryTypedTable(table, Format.GenericRecord)( + BigQueryTypedTable(table, Format.GenericRecordWithLogicalTypes)( self.coder.asInstanceOf[Coder[GenericRecord]] ) )(param) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index fa82cffe93..80d39668cd 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -79,7 +79,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { * Reading records as GenericRecord **should** offer better performance over TableRow records. * * Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as - * STRING. + * STRING. Use `Format.GenericRecordWithLogicalTypes` for avro `date`, `timestamp-micros` and + * `local-timestamp-micros` (avro 1.10+) */ def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] = self.read(BigQueryTypedTable(table, format)) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 2a3ce84045..08e656660b 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -193,7 +193,9 @@ private[types] object ConverterProvider { case t if t =:= typeOf[LocalTime] => q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" case t if t =:= typeOf[LocalDateTime] => - q"_root_.com.spotify.scio.bigquery.DateTime($tree)" + // LocalDateTime is read as avro string + // on write we should use `local-timestamp-micros` + q"_root_.com.spotify.scio.bigquery.DateTime.format($tree)" // different than nested record match below, even though thore are case classes case t if t =:= typeOf[Geography] => diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala index 24370aed6b..48626d69ba 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala @@ -334,7 +334,7 @@ private[types] object TypeProvider { q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})" } val defAvroSchema = - q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)" + q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)" val defToPrettyString = q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)" From 71359eadba2b6cc6f95feff8f85fb2979a6e81c7 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 28 Nov 2024 13:08:38 +0100 Subject: [PATCH 3/7] Fix toPrettyString --- .../scio/bigquery/types/SchemaUtil.scala | 3 + .../scio/bigquery/types/SchemaUtilTest.scala | 125 ++++++++++-------- 2 files changed, 76 insertions(+), 52 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala index 2b2f0d0c27..f460eda2b6 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala @@ -43,6 +43,9 @@ object SchemaUtil { case "DATE" => "LocalDate" case "TIME" => "LocalTime" case "DATETIME" => "LocalDateTime" + case "GEOGRAPHY" => "Geography" + case "JSON" => "Json" + case "BIGNUMERIC" => "BigNumeric" case "RECORD" | "STRUCT" => NameProvider.getUniqueName(tfs.getName) case t => throw new IllegalArgumentException(s"Type: $t not supported") } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaUtilTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaUtilTest.scala index 870478ec1d..f96ab196a1 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaUtilTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaUtilTest.scala @@ -24,66 +24,105 @@ import org.scalatest.flatspec.AnyFlatSpec import scala.jdk.CollectionConverters._ class SchemaUtilTest extends AnyFlatSpec with Matchers { - def newSchema(mode: String): TableSchema = - new TableSchema().setFields( + + "toPrettyString()" should "support indent" in { + val schema = new TableSchema().setFields( List( - new TableFieldSchema() - .setName("boolF") - .setType("BOOLEAN") - .setMode(mode), - new TableFieldSchema().setName("intF").setType("INTEGER").setMode(mode), - new TableFieldSchema().setName("floatF").setType("FLOAT").setMode(mode), - new TableFieldSchema() - .setName("stringF") - .setType("STRING") - .setMode(mode), - new TableFieldSchema().setName("bytesF").setType("BYTES").setMode(mode), - new TableFieldSchema() - .setName("timestampF") - .setType("TIMESTAMP") - .setMode(mode), - new TableFieldSchema().setName("dateF").setType("DATE").setMode(mode), - new TableFieldSchema().setName("timeF").setType("TIME").setMode(mode), - new TableFieldSchema() - .setName("datetimeF") - .setType("DATETIME") - .setMode(mode) + new TableFieldSchema().setName("field").setType("BOOLEAN").setMode("REQUIRED") ).asJava ) - "toPrettyString()" should "support required primitive types" in { - SchemaUtil.toPrettyString(newSchema("REQUIRED"), "Row", 0) should equal( + SchemaUtil.toPrettyString(schema, "Row", 0) should equal("""@BigQueryType.toTable + |case class Row(field: Boolean)""".stripMargin) + SchemaUtil.toPrettyString(schema, "Row", 2) should equal("""@BigQueryType.toTable + |case class Row( + | field: Boolean)""".stripMargin) + SchemaUtil.toPrettyString(schema, "Row", 4) should equal("""@BigQueryType.toTable + |case class Row( + | field: Boolean)""".stripMargin) + } + + it should "support all primitive types" in { + val schema = SchemaProvider.schemaOf[Schemas.Required] + SchemaUtil.toPrettyString(schema, "Row", 2) should equal( """ |@BigQueryType.toTable - |case class Row(boolF: Boolean, intF: Long, floatF: Double, stringF: String, bytesF: ByteString, timestampF: Instant, dateF: LocalDate, timeF: LocalTime, datetimeF: LocalDateTime) + |case class Row( + | boolF: Boolean, + | intF: Long, + | longF: Long, + | floatF: Double, + | doubleF: Double, + | stringF: String, + | byteArrayF: ByteString, + | byteStringF: ByteString, + | timestampF: Instant, + | dateF: LocalDate, + | timeF: LocalTime, + | datetimeF: LocalDateTime, + | bigDecimalF: BigDecimal, + | geographyF: Geography, + | jsonF: Json, + | bigNumericF: BigNumeric) """.stripMargin.trim ) } it should "support nullable primitive types" in { - SchemaUtil.toPrettyString(newSchema("NULLABLE"), "Row", 0) should equal( + val schema = SchemaProvider.schemaOf[Schemas.Optional] + SchemaUtil.toPrettyString(schema, "Row", 2) should equal( """ |@BigQueryType.toTable - |case class Row(boolF: Option[Boolean], intF: Option[Long], floatF: Option[Double], stringF: Option[String], bytesF: Option[ByteString], timestampF: Option[Instant], dateF: Option[LocalDate], timeF: Option[LocalTime], datetimeF: Option[LocalDateTime]) + |case class Row( + | boolF: Option[Boolean], + | intF: Option[Long], + | longF: Option[Long], + | floatF: Option[Double], + | doubleF: Option[Double], + | stringF: Option[String], + | byteArrayF: Option[ByteString], + | byteStringF: Option[ByteString], + | timestampF: Option[Instant], + | dateF: Option[LocalDate], + | timeF: Option[LocalTime], + | datetimeF: Option[LocalDateTime], + | bigDecimalF: Option[BigDecimal], + | geographyF: Option[Geography], + | jsonF: Option[Json], + | bigNumericF: Option[BigNumeric]) """.stripMargin.trim ) } it should "support repeated primitive types" in { - SchemaUtil.toPrettyString(newSchema("REPEATED"), "Row", 0) should equal( + val schema = SchemaProvider.schemaOf[Schemas.Repeated] + SchemaUtil.toPrettyString(schema, "Row", 2) should equal( """ |@BigQueryType.toTable - |case class Row(boolF: List[Boolean], intF: List[Long], floatF: List[Double], stringF: List[String], bytesF: List[ByteString], timestampF: List[Instant], dateF: List[LocalDate], timeF: List[LocalTime], datetimeF: List[LocalDateTime]) + |case class Row( + | boolF: List[Boolean], + | intF: List[Long], + | longF: List[Long], + | floatF: List[Double], + | doubleF: List[Double], + | stringF: List[String], + | byteArrayF: List[ByteString], + | byteStringF: List[ByteString], + | timestampF: List[Instant], + | dateF: List[LocalDate], + | timeF: List[LocalTime], + | datetimeF: List[LocalDateTime], + | bigDecimalF: List[BigDecimal], + | geographyF: List[Geography], + | jsonF: List[Json], + | bigNumericF: List[BigNumeric]) """.stripMargin.trim ) } it should "support records" in { val fields = List( - new TableFieldSchema() - .setName("f1") - .setType("INTEGER") - .setMode("REQUIRED"), + new TableFieldSchema().setName("f1").setType("INTEGER").setMode("REQUIRED"), new TableFieldSchema().setName("f2").setType("FLOAT").setMode("REQUIRED") ).asJava val schema = new TableSchema().setFields( @@ -116,24 +155,6 @@ class SchemaUtilTest extends AnyFlatSpec with Matchers { ) } - it should "support indent" in { - SchemaUtil.toPrettyString(newSchema("REQUIRED"), "Row", 2) should equal( - """ - |@BigQueryType.toTable - |case class Row( - | boolF: Boolean, - | intF: Long, - | floatF: Double, - | stringF: String, - | bytesF: ByteString, - | timestampF: Instant, - | dateF: LocalDate, - | timeF: LocalTime, - | datetimeF: LocalDateTime) - """.stripMargin.trim - ) - } - it should "support reserved words" in { val expectedFields = SchemaUtil.scalaReservedWords .map(e => s"`$e`") From 23c94a880d503e5b13bc1e2ea8354f5bd77abe59 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 28 Nov 2024 13:09:01 +0100 Subject: [PATCH 4/7] Descriptive test class names --- .../bigquery/types/TypeProviderTest.scala | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala index 47389303b3..6f7534a0b3 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala @@ -36,26 +36,26 @@ object TypeProviderTest { @BigQueryType.toTable case class ToTable(f1: Long, f2: Double, f3: Boolean, f4: String, f5: Instant) - @BigQueryType.fromSchema(""" - |{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]} - |""".stripMargin) - class S1 + @BigQueryType.fromSchema( + """{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]}""" + ) + class SingleLineSchema - @BigQueryType.fromSchema(""" - |{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]} - |""".stripMargin) - class S2 + @BigQueryType.fromSchema("""{ + "fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}] + }""") + class MultiLineSchema @BigQueryType.fromSchema(""" |{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]} |""".stripMargin) - class S3 + class StripMarginSchema - @BigQueryType.fromSchema(""" - |{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]} - |""".stripMargin) + @BigQueryType.fromSchema( + """{"fields": [{"mode": "REQUIRED", "name": "f1", "type": "INTEGER"}]}""" + ) @description("Table S4") - class S4 + class DescriptionSchema @BigQueryType.fromSchema( """ @@ -92,26 +92,26 @@ class TypeProviderTest extends AnyFlatSpec with Matchers { import TypeProviderTest._ "BigQueryType.fromSchema" should "support string literal" in { - val r = S1(1L) + val r = SingleLineSchema(1L) r.f1 shouldBe 1L } it should "support multi-line string literal" in { - val r = S2(1L) + val r = MultiLineSchema(1L) r.f1 shouldBe 1L } it should "support multi-line string literal with stripMargin" in { - val r = S3(1L) + val r = StripMarginSchema(1L) r.f1 shouldBe 1L } it should "support table description" in { - S4.tableDescription shouldBe "Table S4" + DescriptionSchema.tableDescription shouldBe "Table S4" } it should "be serializable" in { - SerializableUtils.ensureSerializable(S1(1)) + SerializableUtils.ensureSerializable(SingleLineSchema(1L)) } it should "infer the same schema" in { From 52f6c16e56d11291981d554dae96845e7892e546 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 29 Nov 2024 11:42:49 +0100 Subject: [PATCH 5/7] Add required avro schema factory --- .../main/scala/com/spotify/scio/bigquery/BigQueryIO.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index dc257cd06a..7935ee10ed 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -395,18 +395,20 @@ object BigQueryTypedTable { private[this] def genericRecord( table: Table, useLogicalTypes: Boolean - )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = + )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = { BigQueryTypedTable( beam.BigQueryIO .read(_.getRecord) .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), beam.BigQueryIO .write[GenericRecord]() - .withAvroFormatFunction(_.getElement) + .withAvroFormatFunction(Functions.serializableFn(_.getElement)) + .withAvroSchemaFactory(Functions.serializableFn(BigQueryUtils.toGenericAvroSchema(_, true))) .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), table, (genericRecord: GenericRecord, _: TableSchema) => genericRecord ) + } /** * Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]]. From b4cf9ea2316b17055f7ef90a6c8ce4f1f607f1d3 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 29 Nov 2024 11:43:20 +0100 Subject: [PATCH 6/7] Replace closure cleaner with Serializable function --- .../spotify/scio/bigquery/BigQueryIO.scala | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 7935ee10ed..5f391d2126 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -25,7 +25,6 @@ import com.spotify.scio.coders._ import com.spotify.scio.io._ import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil} import com.spotify.scio.values.{SCollection, SideOutput, SideOutputCollections} -import com.twitter.chill.ClosureCleaner import org.apache.avro.generic.GenericRecord import org.apache.beam.sdk.extensions.gcp.options.GcpOptions import org.apache.beam.sdk.io.Compression @@ -398,7 +397,7 @@ object BigQueryTypedTable { )(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] = { BigQueryTypedTable( beam.BigQueryIO - .read(_.getRecord) + .read(Functions.serializableFn(_.getRecord)) .pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r), beam.BigQueryIO .write[GenericRecord]() @@ -429,12 +428,10 @@ object BigQueryTypedTable { tableRowFn: TableRow => T, table: Table ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(Functions.serializableFn(rFn)) - val writer = beam.BigQueryIO - .write[T]() - .withFormatFunction(Functions.serializableFn(wFn)) + val rFn = Functions.serializableFn(readerFn) + val wFn = Functions.serializableFn(writerFn) + val reader = beam.BigQueryIO.read(rFn) + val writer = beam.BigQueryIO.write[T]().withFormatFunction(wFn) val fn: (GenericRecord, TableSchema) => T = (gr, _) => tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr)) @@ -447,13 +444,10 @@ object BigQueryTypedTable { fn: (GenericRecord, TableSchema) => T, table: Table ): BigQueryTypedTable[T] = { - val rFn = ClosureCleaner.clean(readerFn) - val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) - val writer = beam.BigQueryIO - .write[T]() - .useAvroLogicalTypes() - .withAvroFormatFunction(input => wFn(input.getElement())) + val rFn = Functions.serializableFn(readerFn) + val wFn = Functions.serializableFn((r: AvroWriteRequest[T]) => writerFn(r.getElement)) + val reader = beam.BigQueryIO.read(rFn) + val writer = beam.BigQueryIO.write[T]().withAvroFormatFunction(wFn).useAvroLogicalTypes() BigQueryTypedTable(reader, writer, table, fn) } From 6da6ac274ab26c899b56e2e00d438031ef1ae22e Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 29 Nov 2024 13:46:26 +0100 Subject: [PATCH 7/7] Add more test cases --- .../spotify/scio/bigquery/BigQueryIOIT.scala | 138 +++++++++++++++--- 1 file changed, 121 insertions(+), 17 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala index b29dc500dc..f2d70626d6 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala @@ -17,47 +17,151 @@ package com.spotify.scio.bigquery +import com.spotify.scio.bigquery.BigQueryTypedTable.Format +import com.spotify.scio.coders.Coder +import com.spotify.scio.avro._ import org.apache.beam.sdk.options._ import com.spotify.scio.testing._ import com.spotify.scio.testing.util.ItUtils +import org.apache.avro.generic.GenericRecord object BigQueryIOIT { @BigQueryType.fromTable("bigquery-public-data:samples.shakespeare") class ShakespeareFromTable @BigQueryType.fromQuery( - """ - SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` LIMIT 10 - """ + """SELECT word, word_count + |FROM `bigquery-public-data.samples.shakespeare` + |WHERE corpus = 'kinglear' + |ORDER BY word_count DESC + |LIMIT 5""".stripMargin ) class ShakespeareFromQuery - - val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it") } class BigQueryIOIT extends PipelineSpec { import BigQueryIOIT._ import ItUtils.project + val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it") val options: PipelineOptions = PipelineOptionsFactory .fromArgs(s"--project=$project", s"--tempLocation=$tempLocation") .create() - "Select" should "read typed values from a SQL query" in + val kinglearTop5: Seq[(String, Long)] = Seq( + "the" -> 786L, + "I" -> 622L, + "and" -> 594L, + "of" -> 447L, + "to" -> 438L + ) + + def extractCorpus(r: TableRow): String = + r.get("corpus").asInstanceOf[String] + + def extractCorpus(r: GenericRecord): String = + r.get("corpus").asInstanceOf[String] + + def extractWordCount(r: TableRow): (String, Long) = { + val word = r.get("word").asInstanceOf[String] + val count = r.get("word_count").asInstanceOf[String].toLong + word -> count + } + + def extractWordCount(r: GenericRecord): (String, Long) = { + val word = r.get("word").asInstanceOf[String] + val count = r.get("word_count").asInstanceOf[String].toLong + word -> count + } + + "Select" should "read values from a SQL query" in { + runWithRealContext(options) { sc => + val query = Query(ShakespeareFromQuery.queryRaw) + val scoll = sc.bigQuerySelect(query).map(extractWordCount) + + scoll should containInAnyOrder(kinglearTop5) + } + } + + it should "read storage values from a SQL query" in { runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromQuery]) - scoll should haveSize(10) - scoll should satisfy[ShakespeareFromQuery] { - _.forall(_.getClass == classOf[ShakespeareFromQuery]) - } + val query = Query(ShakespeareFromQuery.queryRaw) + val scoll = sc + .bigQueryStorage(query) + .map(extractWordCount) + + scoll should containInAnyOrder(kinglearTop5) } + } - "TableRef" should "read typed values from table" in + it should "read typed values from a SQL query" in { runWithRealContext(options) { sc => - val scoll = sc.read(BigQueryTyped[ShakespeareFromTable]) - scoll.take(10) should haveSize(10) - scoll should satisfy[ShakespeareFromTable] { - _.forall(_.getClass == classOf[ShakespeareFromTable]) - } + val scoll = sc + .typedBigQuery[ShakespeareFromQuery]() + .flatMap { r => + for { + w <- r.word + c <- r.word_count + } yield w -> c + } + + scoll should containInAnyOrder(kinglearTop5) + } + } + + "Table" should "read values from table" in { + runWithRealContext(options) { sc => + val table = Table.Spec(ShakespeareFromTable.table) + val scoll = sc + .bigQueryTable(table) + .filter(r => extractCorpus(r) == "kinglear") + .map(extractWordCount) + .top(5)(Ordering.by(_._2)) + .flatten + + scoll should containInAnyOrder(kinglearTop5) + } + } + + it should "read avro values from table" in { + runWithRealContext(options) { sc => + // BQ limitation: We can't give an avro reader schema + implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder + val table = Table.Spec(ShakespeareFromTable.table) + val scoll = sc + .bigQueryTable[GenericRecord](table, Format.GenericRecord) + .filter(r => extractCorpus(r) == "kinglear") + .map(extractWordCount) + .top(5)(Ordering.by(_._2)) + .flatten + + scoll should containInAnyOrder(kinglearTop5) + } + } + + it should "read storage values from table" in { + runWithRealContext(options) { sc => + val table = Table.Spec(ShakespeareFromTable.table) + val scoll = sc + .bigQueryStorage(table) + .filter(r => extractCorpus(r) == "kinglear") + .map(extractWordCount) + .top(5)(Ordering.by(_._2)) + .flatten + + scoll should containInAnyOrder(kinglearTop5) + } + } + + it should "read typed values from table" in { + runWithRealContext(options) { sc => + val scoll = sc + .typedBigQuery[ShakespeareFromTable]() + .collect { case r if r.corpus == "kinglear" => r.word -> r.word_count } + .top(5)(Ordering.by(_._2)) + .flatten + + scoll should containInAnyOrder(kinglearTop5) } + } }