Skip to content

Commit

Permalink
fixup! BigQuery JSON column: encode as Jackson JsonNode on write to p…
Browse files Browse the repository at this point in the history
…revent escape
  • Loading branch information
turb committed Nov 15, 2024
1 parent 441cb89 commit c0303bb
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery.types

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.google.api.services.bigquery.model.TableRow
import com.google.protobuf.ByteString
import com.spotify.scio.bigquery.types.MacroUtil._
Expand Down Expand Up @@ -171,14 +172,13 @@ private[types] object ConverterProvider {
val provider: OverrideTypeProvider =
OverrideTypeProviderFinder.getProvider
tpe match {
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => tree
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => q"$tree.toLong"
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => q"$tree.toDouble"
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
Expand All @@ -199,7 +199,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.asJackson"
q"$tree.wkt"
case t if t =:= typeOf[BigNumeric] =>
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"

Expand Down Expand Up @@ -277,13 +277,12 @@ private[types] object ConverterProvider {
tpe match {
case t if provider.shouldOverrideType(c)(t) =>
provider.createInstance(c)(t, q"$tree")
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => q"$s"
case t if t =:= typeOf[Boolean] => q"$s.toBoolean"
case t if t =:= typeOf[Int] => q"$s.toInt"
case t if t =:= typeOf[Long] => q"$s.toLong"
case t if t =:= typeOf[Float] => q"$s.toFloat"
case t if t =:= typeOf[Double] => q"$s.toDouble"
case t if t =:= typeOf[String] => q"$s"
case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($s)"

Expand Down Expand Up @@ -414,7 +413,8 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.asJackson"
// for BigQuery, we need to provide parsed JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.ConverterUtil.readJsonTree($tree)"
case t if t =:= typeOf[BigNumeric] =>
q"$tree.wkt"

Expand Down Expand Up @@ -476,6 +476,9 @@ private[types] object ConverterProvider {
}

object ConverterUtil {
private val mapper = new ObjectMapper()
def readJsonTree(json: Json): JsonNode = mapper.readTree(json.wkt)

@inline final def notNull[@specialized(Boolean, Int, Long, Float, Double) T](x: T): Boolean =
x != null
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ private[types] object SchemaProvider {
case t if t =:= typeOf[LocalDateTime] => ("DATETIME", Iterable.empty)
case t if t =:= typeOf[Geography] => ("GEOGRAPHY", Iterable.empty)
case t if t =:= typeOf[Json] => ("JSON", Iterable.empty)
case t if t =:= typeOf[com.fasterxml.jackson.databind.JsonNode] => ("JSON", Iterable.empty)

case t if isCaseClass(t) => ("RECORD", toFields(t))
case _ => throw new RuntimeException(s"Unsupported type: $tpe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
Expand Down Expand Up @@ -54,20 +53,14 @@ package object types {
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances. On write, they will be transformed into
* Jackson JsonNode.
* Case class to serve as raw type for Json instances.
*
* See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String) {
def asJackson: JsonNode = Json.mapper.readTree(wkt)
}
object Json {
private val mapper = new ObjectMapper()
}
case class Json(wkt: String)

/**
* Case class to serve as BigNumeric type to distinguish them from Numeric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class ConverterProviderSpec
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json("{\"" + key + "\":\"" + value + "\"}")
} yield Json(s"""{"$key":"$value"}""")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
Expand Down

0 comments on commit c0303bb

Please sign in to comment.