Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some warnings and add command line parsing for unordered field keys #330

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.language.higherKinds
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

/**
* Diff type between two records of the same key.
Expand Down Expand Up @@ -423,12 +425,14 @@ object BigDiffy extends Command with Serializable {
|
| --input-mode=(avro|bigquery) Diff-ing Avro or BQ records
| [--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset. Defaults to GCS
| --key=<key> '.' separated key field. Specify multiple --key params for multi key usage.
| --key=<key> '.' separated key field. Specify multiple --key params or multiple ',' separated key fields for multi key usage.
| --lhs=<path> LHS File path or BigQuery table
| --rhs=<path> RHS File path or BigQuery table
| --output=<output> File path prefix for output
| --ignore=<keys> ',' separated field list to ignore
| --unordered=<keys> ',' separated field list to treat as unordered
| --unorderedFieldKey=<key> ',' separated list of keys for fields which are unordered nested records. Mappings use '->'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of switching this to : delimited

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like : better than ->

| For example --unorderedFieldKey=fieldPath->fieldKey,otherPath->otherKey
| [--with-header] Output all TSVs with header rows. Defaults to false
| [--ignore-nan] Ignore NaN values when computing stats for differences
|
Expand Down Expand Up @@ -489,6 +493,14 @@ object BigDiffy extends Command with Serializable {
(r: TableRow) => MultiKey(xs.map(x => get(x, 0, r)))
}

private[diffy] def unorderedKeysMap(unorderedKeysArgs: List[String]): Try[Map[String, String]] = {
Try(unorderedKeysArgs.map { arg =>
val keyMappings = arg.split("->")
assert(keyMappings.size == 2, s"Invalid unordered field key mapping $arg")
(keyMappings(0), keyMappings(1))
}.toMap)
}

def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part"

implicit class TextFileHeader(coll: SCollection[String]) {
Expand All @@ -511,19 +523,27 @@ object BigDiffy extends Command with Serializable {
def run(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val (inputMode, keys, lhs, rhs, output, header, ignore, unordered, outputMode, ignoreNan) = {
val (inputMode, keys, lhs, rhs, output, header, ignore, unordered,
unorderedKeysList, outputMode, ignoreNan) = {
try {
(args("input-mode"), args.list("key"), args("lhs"), args("rhs"), args("output"),
args.boolean("with-header", false), args.list("ignore").toSet,
args.list("unordered").toSet, args.optional("output-mode"),
args.boolean("ignore-nan", false))
args.list("unordered").toSet, args.list("unorderedFieldKey"),
args.optional("output-mode"), args.boolean("ignore-nan", false))
} catch {
case e: Throwable =>
usage()
throw e
}
}

val unorderedKeys = unorderedKeysMap(unorderedKeysList) match {
case Success(m) => m
case Failure(e) =>
usage()
throw e
}

val om: OutputMode = outputMode match {
case Some("gcs") => GCS
case Some("bigquery") => BQ
Expand All @@ -543,7 +563,7 @@ object BigDiffy extends Command with Serializable {
val schema = new AvroSampler(rhs, conf = Some(sc.options))
.sample(1, head = true).head.getSchema
implicit val grCoder: Coder[GenericRecord] = Coder.avroGenericRecordCoder(schema)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
val lhsSCollection = sc.avroFile(lhs, schema)
val rhsSCollection = sc.avroFile(rhs, schema)
BigDiffy
Expand All @@ -554,7 +574,7 @@ object BigDiffy extends Command with Serializable {
val lSchema = bq.tables.schema(lhs)
val rSchema = bq.tables.schema(rhs)
val schema = mergeTableSchema(lSchema, rSchema)
val diffy = new TableRowDiffy(schema, ignore, unordered)
val diffy = new TableRowDiffy(schema, ignore, unordered, unorderedKeys)
BigDiffy.diffTableRow(sc, lhs, rhs, tableRowKeyFn(keys), diffy, ignoreNan)
case m =>
throw new IllegalArgumentException(s"input mode $m not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import com.spotify.scio.testing.PipelineSpec

import com.google.api.services.bigquery.model.TableRow

import scala.language.higherKinds

class BigDiffyTest extends PipelineSpec {

val keys = (1 to 1000).map(k => MultiKey("key" + k))
Expand Down Expand Up @@ -240,6 +242,15 @@ class BigDiffyTest extends PipelineSpec {
keyValues.toString shouldBe "foo_bar"
}

"BigDiffy unorderedKeysMap" should "work with multiple unordered keys" in {
val keyMappings = List("record.nested_record->key", "record.other_nested_record->other_key")
val unorderdKeys = BigDiffy.unorderedKeysMap(keyMappings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: typo, unordered


unorderdKeys.isSuccess shouldBe true
unorderdKeys.get shouldBe Map("record.nested_record" -> "key",
"record.other_nested_record" -> "other_key")
}

it should "throw an exception when in GCS output mode and output is not gs://" in {
val exc = the[Exception] thrownBy {
val args = Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
import org.apache.beam.sdk.options.PipelineOptions
import org.slf4j.LoggerFactory

import scala.language.existentials
import scala.language.{existentials, higherKinds}
import scala.util.Try
import scala.reflect.ClassTag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.spotify.scio.coders.Coder
import org.apache.beam.sdk.transforms.ParDo
import org.slf4j.{Logger, LoggerFactory}

import scala.language.higherKinds
import scala.reflect.ClassTag
import scala.math._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.spotify.ratatool.scalacheck

import com.spotify.ratatool.proto.Schemas.{OptionalNestedRecord, RequiredNestedRecord, TestRecord}
import org.scalacheck.{Gen, Properties}
import org.scalacheck.Prop.{BooleanOperators, all, forAll}
import org.scalacheck.Prop.{propBoolean, all, forAll}


object ProtoBufGeneratorTest extends Properties("ProtoBufGenerator") {
Expand Down