diff --git a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala index dfba316d..ed7fe056 100644 --- a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala +++ b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala @@ -41,7 +41,9 @@ import org.slf4j.{Logger, LoggerFactory} import scala.annotation.tailrec import scala.jdk.CollectionConverters._ import scala.collection.mutable +import scala.language.higherKinds import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} /** * Diff type between two records of the same key. @@ -423,12 +425,14 @@ object BigDiffy extends Command with Serializable { | | --input-mode=(avro|bigquery) Diff-ing Avro or BQ records | [--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset. Defaults to GCS - | --key= '.' separated key field. Specify multiple --key params for multi key usage. + | --key= '.' separated key field. Specify multiple --key params or multiple ',' separated key fields for multi key usage. | --lhs= LHS File path or BigQuery table | --rhs= RHS File path or BigQuery table | --output= File path prefix for output | --ignore= ',' separated field list to ignore | --unordered= ',' separated field list to treat as unordered + | --unorderedFieldKey= ',' separated list of keys for fields which are unordered nested records. Mappings use ':' + | For example --unorderedFieldKey=fieldPath:fieldKey,otherPath:otherKey | [--with-header] Output all TSVs with header rows. Defaults to false | [--ignore-nan] Ignore NaN values when computing stats for differences | @@ -489,6 +493,14 @@ object BigDiffy extends Command with Serializable { (r: TableRow) => MultiKey(xs.map(x => get(x, 0, r))) } + private[diffy] def unorderedKeysMap(unorderedKeysArgs: List[String]): Try[Map[String, String]] = { + Try(unorderedKeysArgs.map { arg => + val keyMappings = arg.split(":") + assert(keyMappings.size == 2, s"Invalid unordered field key mapping $arg") + (keyMappings(0), keyMappings(1)) + }.toMap) + } + def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part" implicit class TextFileHeader(coll: SCollection[String]) { @@ -511,12 +523,13 @@ object BigDiffy extends Command with Serializable { def run(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - val (inputMode, keys, lhs, rhs, output, header, ignore, unordered, outputMode, ignoreNan) = { + val (inputMode, keys, lhs, rhs, output, header, ignore, unordered, + unorderedKeysList, outputMode, ignoreNan) = { try { (args("input-mode"), args.list("key"), args("lhs"), args("rhs"), args("output"), args.boolean("with-header", false), args.list("ignore").toSet, - args.list("unordered").toSet, args.optional("output-mode"), - args.boolean("ignore-nan", false)) + args.list("unordered").toSet, args.list("unorderedFieldKey"), + args.optional("output-mode"), args.boolean("ignore-nan", false)) } catch { case e: Throwable => usage() @@ -524,6 +537,13 @@ object BigDiffy extends Command with Serializable { } } + val unorderedKeys = unorderedKeysMap(unorderedKeysList) match { + case Success(m) => m + case Failure(e) => + usage() + throw e + } + val om: OutputMode = outputMode match { case Some("gcs") => GCS case Some("bigquery") => BQ @@ -543,7 +563,7 @@ object BigDiffy extends Command with Serializable { val schema = new AvroSampler(rhs, conf = Some(sc.options)) .sample(1, head = true).head.getSchema implicit val grCoder: Coder[GenericRecord] = Coder.avroGenericRecordCoder(schema) - val diffy = new AvroDiffy[GenericRecord](ignore, unordered) + val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys) val lhsSCollection = sc.avroFile(lhs, schema) val rhsSCollection = sc.avroFile(rhs, schema) BigDiffy @@ -554,7 +574,7 @@ object BigDiffy extends Command with Serializable { val lSchema = bq.tables.schema(lhs) val rSchema = bq.tables.schema(rhs) val schema = mergeTableSchema(lSchema, rSchema) - val diffy = new TableRowDiffy(schema, ignore, unordered) + val diffy = new TableRowDiffy(schema, ignore, unordered, unorderedKeys) BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(keys), diffy, ignoreNan) case m => throw new IllegalArgumentException(s"input mode $m not supported") diff --git a/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala b/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala index aace73fc..a25efa1b 100644 --- a/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala +++ b/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala @@ -29,6 +29,8 @@ import com.spotify.scio.testing.PipelineSpec import com.google.api.services.bigquery.model.TableRow +import scala.language.higherKinds + class BigDiffyTest extends PipelineSpec { val keys = (1 to 1000).map(k => MultiKey("key" + k)) @@ -240,6 +242,15 @@ class BigDiffyTest extends PipelineSpec { keyValues.toString shouldBe "foo_bar" } + "BigDiffy unorderedKeysMap" should "work with multiple unordered keys" in { + val keyMappings = List("record.nested_record:key", "record.other_nested_record:other_key") + val unorderedKeys = BigDiffy.unorderedKeysMap(keyMappings) + + unorderedKeys.isSuccess shouldBe true + unorderedKeys.get shouldBe Map("record.nested_record" -> "key", + "record.other_nested_record" -> "other_key") + } + it should "throw an exception when in GCS output mode and output is not gs://" in { val exc = the[Exception] thrownBy { val args = Array( diff --git a/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala b/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala index 6b9a2e82..d07816dd 100644 --- a/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala +++ b/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala @@ -38,7 +38,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers import org.apache.beam.sdk.options.PipelineOptions import org.slf4j.LoggerFactory -import scala.language.existentials +import scala.language.{existentials, higherKinds} import scala.util.Try import scala.reflect.ClassTag diff --git a/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala b/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala index 102ddde4..bdb70ee7 100644 --- a/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala +++ b/ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/util/SamplerSCollectionFunctions.scala @@ -23,6 +23,7 @@ import com.spotify.scio.coders.Coder import org.apache.beam.sdk.transforms.ParDo import org.slf4j.{Logger, LoggerFactory} +import scala.language.higherKinds import scala.reflect.ClassTag import scala.math._ diff --git a/ratatool-scalacheck/src/test/scala/com/spotify/ratatool/scalacheck/ProtoBufGeneratorTest.scala b/ratatool-scalacheck/src/test/scala/com/spotify/ratatool/scalacheck/ProtoBufGeneratorTest.scala index 6fad862b..482d53dc 100644 --- a/ratatool-scalacheck/src/test/scala/com/spotify/ratatool/scalacheck/ProtoBufGeneratorTest.scala +++ b/ratatool-scalacheck/src/test/scala/com/spotify/ratatool/scalacheck/ProtoBufGeneratorTest.scala @@ -19,7 +19,7 @@ package com.spotify.ratatool.scalacheck import com.spotify.ratatool.proto.Schemas.{OptionalNestedRecord, RequiredNestedRecord, TestRecord} import org.scalacheck.{Gen, Properties} -import org.scalacheck.Prop.{BooleanOperators, all, forAll} +import org.scalacheck.Prop.{propBoolean, all, forAll} object ProtoBufGeneratorTest extends Properties("ProtoBufGenerator") {