Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

clairemcginty
Copy link
Contributor

@clairemcginty clairemcginty commented Jul 9, 2024

Magnolify is now on Parquet 0.14, which includes a bugfix for PARQUET-2425 -- AvroSchemaConverter no longer throws an exception when parsing non-grouped repeated fields (the magnolify-parquet default). This was an early blocker in our efforts to deprecate AvroCompat (see discussion on #766).

This PR:

  • Deprecates AvroCompat in favor of (a) making writeSupport/schema operations Configurable, and introduces a Configuration option specifically for writing grouped arrays
  • Updates readSupport to automatically detect any grouped arrays in the the Write schema and correct the Read schema as needed.
  • Writes Avro schemas to file metadata footer by default

@@ -51,7 +51,7 @@ sealed trait ParquetField[T] extends Serializable {
protected final def nonEmpty(v: T): Boolean = !isEmpty(v)

def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit
def newConverter: TypeConverter[T]
def newConverter(writerSchema: Type): TypeConverter[T]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternately, we could create an overloaded method like:

def newConverter(): TypeConverter = newConverter(false)
def newConverter(avroCompat: Boolean): TypeConverter = ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will allow forward binary compatibility. Otherwise we should move this to the v0.8 base branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. forward compat would be nice, but long-term I'd like to be able to phase out the whole AvroCompat option from Magnolify and just write grouped arrays by default... so I'd rather not encode it further into the Magnolify API (def newConverter(avroCompat: Boolean)). So I think we can keep this as is and I'll rebase onto v0.8 branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although now that I think about it more, the param here should be readSchema/requestedSchema, not writerSchema

.asInstanceOf[TypeConverter.Buffered[T]]
.withRepetition(Repetition.REPEATED)
val arrayConverter = new TypeConverter.Delegate[T, C[T]](buffered) {
override def get: C[T] = inner.get(fc.fromSpecific)
}

if (hasAvroArray) {
if (Schema.hasGroupedArray(writerSchema)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would throw an error if the schema mixed grouped and non-grouped array types since it checks for the presence of a grouped array in the entire schema, rather than for the specific field.... but we don't have access to CaseMapper here so we couldn't easily isolate the specific field schema. The original approach with the AvroCompat import was also all-or-nothing so this shouldn't functionally be a change in behavior

@clairemcginty clairemcginty changed the title [fix #766] Derive AvroCompat automatically on read (fix #766) Derive AvroCompat automatically on read Jul 9, 2024
Copy link

codecov bot commented Jul 9, 2024

Codecov Report

Attention: Patch coverage is 94.93671% with 4 lines in your changes missing coverage. Please review.

Project coverage is 95.64%. Comparing base (032b52a) to head (b9a7281).

Files with missing lines Patch % Lines
...src/main/scala/magnolify/parquet/ParquetType.scala 86.36% 3 Missing ⚠️
...rc/main/scala/magnolify/parquet/ParquetField.scala 97.61% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #996      +/-   ##
==========================================
+ Coverage   95.50%   95.64%   +0.13%     
==========================================
  Files          56       57       +1     
  Lines        1980     1996      +16     
  Branches      186      162      -24     
==========================================
+ Hits         1891     1909      +18     
+ Misses         89       87       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@clairemcginty clairemcginty changed the title (fix #766) Derive AvroCompat automatically on read (fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write Sep 12, 2024
@@ -693,6 +693,7 @@ lazy val tools = project
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion,
"org.apache.avro" % "avro" % avroVersion % Provided,
"org.apache.parquet" % "parquet-hadoop" % parquetVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
Copy link
Contributor Author

@clairemcginty clairemcginty Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about relying so heavily on the hadoop Configuration class, since it pulls in hadoop-common artifact and links us more tightly with Hadoop. Parquet is trying to move away from Configuration and onto their own ParquetConfiguration class, which we could use instead. However, it might be confusing for Scio users since Scio is heavily dependent on Configuration and we don't have immediate plans to offboard from it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I might pull this out into a separate PR. will update shortly

@clairemcginty clairemcginty marked this pull request as ready for review September 12, 2024 20:44
val metadata = new java.util.HashMap[String, String]()
if (parquetType.avroCompat) {
// This overrides `WriteSupport#getName`
metadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, "avro")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did drop the behavior of writing writer.model.name: avro if AvroCompat is enabled -- I don't think it makes sense, it should still be Magnolify. I can't think of any reason why this would impact downstream readers -- model name shouldn't matter at all when comparing schema compatibility across files. lf anyone can think of a good reason why this change is breaking let me know...

Comment on lines 354 to 357
groupAvroArrays || conf.getBoolean(
MagnolifyParquetProperties.WriteGroupedArrays,
MagnolifyParquetProperties.WriteGroupedArraysDefault
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this check in conf is very costly and has a significant impact on performance. memorization can help, but IMHO we should give the configuration when creating the ParquetType (same place we actually capture the deprecated ParquetArray), so we can compute this only once. WDYT ?

Comment on lines 332 to 338
// Legacy compat with Magnolify <= 0.7; future versions will remove AvroCompat in favor of
// Configuration-based approach
@nowarn("cat=deprecation")
val groupAvroArrays: Boolean = pa match {
case ParquetArray.default => false
case ParquetArray.AvroCompat.avroCompat => true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably not ideal. I think we should be able to re-use any ParquetField and decide on the behavior when instantiating the parquet type. Here we now mix behaviors with the config.

import MagnolifyBench._

@Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested)
@Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to capture a "true" read benchmark for Parquet since there's so much happening under the hood here (reading and caching the row group, for example). But at least this can be used to track positive and negative trends

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, this was bothering me, so I re-implemented this benchmark so that instead of reading/writing entire file streams, it's directly writing/reading Pages (smallest unit of IO granularity in Parquet). This skips a lot of the overhead of the file/rowgroup IO, so that we're able to specifically benchmark ParquetType's functionality: converting between parquet Groups and Scala case classes.

@clairemcginty
Copy link
Contributor Author

Ok, using the benchmarks from #1040, this change looks performance-neutral on read/write:

main branch benchmark:

% sbt "jmh/jmh:run -i 10 -wi 10 -f1 -t .*parquet.*magnolify"
[info] Benchmark                           Mode  Cnt      Score     Error  Units
[info] ParquetBench.parquetReadMagnolify  avgt   10  13695.172 ± 311.972  ns/op
[info] ParquetBench.parquetWriteMagnolify  avgt   10  5527.228 ± 70.377  ns/op

avro-compat-rework benchmark:

% sbt "jmh/jmh:run -i 10 -wi 10 -f1 -t .*parquet.*magnolify"
[info] Benchmark                           Mode  Cnt      Score     Error  Units
[info] ParquetBench.parquetReadMagnolify   avgt   10  13491.778 ± 124.734  ns/op
[info] ParquetBench.parquetWriteMagnolify  avgt   10   4769.038 ±  61.517  ns/op

the Configuration check does add some time to WriteContext#init, but that's called once per task only

protected def isEmpty(v: T): Boolean
protected final def nonEmpty(v: T): Boolean = !isEmpty(v)

def write(c: RecordConsumer, v: T)(cm: CaseMapper): Unit
def newConverter: TypeConverter[T]
def write(c: RecordConsumer, v: T)(cm: CaseMapper, groupArrayFields: Boolean): Unit
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this solution now works by passing a precomputed groupArrayFields flag around to all write method.

It works, but I'm thinking that we may want to generalize this from a boolean flag into a Map[String, _]-typed configuration field (parsed from the Hadoop Configuration object passed to ParquetType).

This would give us more flexibility if we need to make any more write options configurable in future. As an example, if a user has any LocalDate* fields in their ParquetType case class, by default parquet-avro will convert them into a local-timestamp-{millis, micros} for Avro 1.11, but timestamp-{millis, micros} on Avro 1.8. We might want to make this behavior configurable to preserve schema compatibility across Avro upgrades.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt @RustedBones ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a custom trait containing all desired write setting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use a trait!

Copy link
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should speak of list logical-type instead of grouped array.

I'm also wondering if by default we should respect the spec.

parquet-avro does not respect the default, and falls in the backward compat rules, case 1.
Do you know if parquet-avro is able to read arrays when the default spec is used ?

@@ -192,6 +195,59 @@ class ParquetTypeSuite extends MagnolifySuite {
assertEquals(inner.getFields.asScala.map(_.getName).toSeq, Seq("INNERFIRST"))
}
}

test(s"AvroCompat") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test(s"AvroCompat") {
test("AvroCompat") {


sealed trait ParquetField[T] extends Serializable {

@transient private lazy val schemaCache: concurrent.Map[UUID, Type] =
@transient private lazy val schemaCache: concurrent.Map[Boolean, concurrent.Map[UUID, Type]] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using a compound key, instead of a nested map ?

Suggested change
@transient private lazy val schemaCache: concurrent.Map[Boolean, concurrent.Map[UUID, Type]] =
@transient private lazy val schemaCache: concurrent.Map[(Boolean, UUID), Type] =

@clairemcginty
Copy link
Contributor Author

parquet-avro does not respect the default, and falls in the backward compat rules, case 1.
Do you know if parquet-avro is able to read arrays when the default spec is used ?

Unfortunately, not. It looks like this has just never been implemented. It's something we could try to get into the next Parquet release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants