From c0303bbbe64b81d29d76659a10b205332631c67b Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Fri, 15 Nov 2024 16:33:27 +0100 Subject: [PATCH] fixup! BigQuery JSON column: encode as Jackson JsonNode on write to prevent escape --- .../bigquery/types/ConverterProvider.scala | 37 ++++++++++--------- .../scio/bigquery/types/SchemaProvider.scala | 1 - .../spotify/scio/bigquery/types/package.scala | 11 +----- .../types/ConverterProviderSpec.scala | 2 +- 4 files changed, 23 insertions(+), 28 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index a0ded09406..6e2f228dfe 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -17,6 +17,7 @@ package com.spotify.scio.bigquery.types +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.google.api.services.bigquery.model.TableRow import com.google.protobuf.ByteString import com.spotify.scio.bigquery.types.MacroUtil._ @@ -171,14 +172,13 @@ private[types] object ConverterProvider { val provider: OverrideTypeProvider = OverrideTypeProviderFinder.getProvider tpe match { - case t if provider.shouldOverrideType(c)(t) => q"$tree.toString" - case t if t =:= typeOf[Boolean] => tree - case t if t =:= typeOf[Int] => q"$tree.toLong" - case t if t =:= typeOf[Long] => tree - case t if t =:= typeOf[Float] => q"$tree.toDouble" - case t if t =:= typeOf[Double] => tree - case t if t =:= typeOf[String] => tree - case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => tree + case t if provider.shouldOverrideType(c)(t) => q"$tree.toString" + case t if t =:= typeOf[Boolean] => tree + case t if t =:= typeOf[Int] => q"$tree.toLong" + case t if t =:= typeOf[Long] => tree + case t if t =:= typeOf[Float] => q"$tree.toDouble" + case t if t =:= typeOf[Double] => tree + case t if t =:= typeOf[String] => tree case t if t =:= typeOf[BigDecimal] => q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString" @@ -199,7 +199,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Geography] => q"$tree.wkt" case t if t =:= typeOf[Json] => - q"$tree.asJackson" + q"$tree.wkt" case t if t =:= typeOf[BigNumeric] => q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString" @@ -277,13 +277,12 @@ private[types] object ConverterProvider { tpe match { case t if provider.shouldOverrideType(c)(t) => provider.createInstance(c)(t, q"$tree") - case t if t =:= typeOf[Boolean] => q"$s.toBoolean" - case t if t =:= typeOf[Int] => q"$s.toInt" - case t if t =:= typeOf[Long] => q"$s.toLong" - case t if t =:= typeOf[Float] => q"$s.toFloat" - case t if t =:= typeOf[Double] => q"$s.toDouble" - case t if t =:= typeOf[String] => q"$s" - case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => q"$s" + case t if t =:= typeOf[Boolean] => q"$s.toBoolean" + case t if t =:= typeOf[Int] => q"$s.toInt" + case t if t =:= typeOf[Long] => q"$s.toLong" + case t if t =:= typeOf[Float] => q"$s.toFloat" + case t if t =:= typeOf[Double] => q"$s.toDouble" + case t if t =:= typeOf[String] => q"$s" case t if t =:= typeOf[BigDecimal] => q"_root_.com.spotify.scio.bigquery.Numeric($s)" @@ -414,7 +413,8 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Geography] => q"$tree.wkt" case t if t =:= typeOf[Json] => - q"$tree.asJackson" + // for BigQuery, we need to provide parsed JSON to prevent escaping + q"_root_.com.spotify.scio.bigquery.types.ConverterUtil.readJsonTree($tree)" case t if t =:= typeOf[BigNumeric] => q"$tree.wkt" @@ -476,6 +476,9 @@ private[types] object ConverterProvider { } object ConverterUtil { + private val mapper = new ObjectMapper() + def readJsonTree(json: Json): JsonNode = mapper.readTree(json.wkt) + @inline final def notNull[@specialized(Boolean, Int, Long, Float, Double) T](x: T): Boolean = x != null } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index f037acbb08..b150b8a7c4 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -90,7 +90,6 @@ private[types] object SchemaProvider { case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty) case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty) case t if t =:= typeOf[Json] => ("JSON", Iterable.empty) - case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => ("JSON", Iterable.empty) case t if isCaseClass(t) => ("RECORD", toFields(t)) case _ => throw new RuntimeException(s"Unsupported type: $tpe") diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index c37be9b149..8345c7d6bc 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -17,7 +17,6 @@ package com.spotify.scio.bigquery -import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.spotify.scio.coders.Coder import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes @@ -54,20 +53,14 @@ package object types { case class Geography(wkt: String) /** - * Case class to serve as raw type for Json instances. On write, they will be transformed into - * Jackson JsonNode. + * Case class to serve as raw type for Json instances. * * See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type * * @param wkt * Well Known Text formatted string that BigQuery displays for Json */ - case class Json(wkt: String) { - def asJackson: JsonNode = Json.mapper.readTree(wkt) - } - object Json { - private val mapper = new ObjectMapper() - } + case class Json(wkt: String) /** * Case class to serve as BigNumeric type to distinguish them from Numeric. diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 50bb9de4dc..797837c052 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -55,7 +55,7 @@ final class ConverterProviderSpec for { key <- Gen.alphaStr value <- Gen.alphaStr - } yield Json("{\"" + key + "\":\"" + value + "\"}") + } yield Json(s"""{"$key":"$value"}""") ) implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList) implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)