-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996
base: main
Are you sure you want to change the base?
(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996
Changes from 10 commits
61e5581
a1ee149
81e3f07
a730718
5e622ab
db5b1b0
be75cc3
56ff232
ca535fc
da8fb91
23bcbd9
eb205db
959c7d2
b9a7281
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,18 @@ | |
|
||
package magnolify.jmh | ||
|
||
import java.util.concurrent.TimeUnit | ||
import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile} | ||
|
||
import java.util.concurrent.TimeUnit | ||
import magnolify.scalacheck.auto._ | ||
import magnolify.test.Simple._ | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} | ||
import org.scalacheck._ | ||
import org.openjdk.jmh.annotations._ | ||
|
||
import scala.jdk.CollectionConverters._ | ||
|
||
object MagnolifyBench { | ||
val seed: rng.Seed = rng.Seed(0) | ||
val prms: Gen.Parameters = Gen.Parameters.default | ||
|
@@ -87,6 +92,77 @@ class AvroBench { | |
@Benchmark def avroSchema: Schema = AvroType[Nested].schema | ||
} | ||
|
||
@State(Scope.Benchmark) | ||
class ParquetReadState(pt: ParquetType[Nested]) { | ||
var out: TestOutputFile = null | ||
var reader: ParquetReader[Nested] = null | ||
|
||
@Setup(Level.Invocation) | ||
def setup(): Unit = { | ||
out = new TestOutputFile | ||
val writer = pt.writeBuilder(out).build() | ||
writer.write(MagnolifyBench.nested) | ||
writer.close() | ||
|
||
val in = new TestInputFile(out.getBytes) | ||
reader = pt.readBuilder(in).build() | ||
} | ||
|
||
@TearDown(Level.Invocation) | ||
def tearDown(): Unit = { | ||
reader.close() | ||
} | ||
} | ||
|
||
@State(Scope.Benchmark) | ||
class ParquetWriteState(pt: ParquetType[Nested]) { | ||
var writer: ParquetWriter[Nested] = null | ||
|
||
@Setup(Level.Invocation) | ||
def setup(): Unit = { | ||
val out = new TestOutputFile | ||
writer = pt.writeBuilder(out).build() | ||
} | ||
|
||
@TearDown(Level.Invocation) | ||
def tearDown(): Unit = { | ||
writer.close() | ||
} | ||
} | ||
|
||
object ParquetStates { | ||
def confWithGroupedArraysProp(propValue: Boolean): Configuration = { | ||
val conf = new Configuration() | ||
conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue) | ||
conf | ||
} | ||
class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false))) | ||
class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false))) | ||
|
||
class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true))) | ||
class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true))) | ||
} | ||
|
||
@BenchmarkMode(Array(Mode.AverageTime)) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@State(Scope.Thread) | ||
class ParquetBench { | ||
import MagnolifyBench._ | ||
|
||
@Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested) | ||
@Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's hard to capture a "true" read benchmark for Parquet since there's so much happening under the hood here (reading and caching the row group, for example). But at least this can be used to track positive and negative trends There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, this was bothering me, so I re-implemented this benchmark so that instead of reading/writing entire file streams, it's directly writing/reading Pages (smallest unit of IO granularity in Parquet). This skips a lot of the overhead of the file/rowgroup IO, so that we're able to specifically benchmark ParquetType's functionality: converting between parquet Groups and Scala case classes. |
||
} | ||
|
||
@BenchmarkMode(Array(Mode.AverageTime)) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@State(Scope.Thread) | ||
class ParquetAvroCompatBench { | ||
import MagnolifyBench._ | ||
|
||
@Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested) | ||
@Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read() | ||
} | ||
|
||
@BenchmarkMode(Array(Mode.AverageTime)) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@State(Scope.Thread) | ||
|
@@ -157,7 +233,7 @@ class ExampleBench { | |
private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get | ||
private val example = exampleType.to(exampleNested).build() | ||
@Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) | ||
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example) | ||
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) | ||
} | ||
|
||
// Collections are not supported | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Copyright 2024 Spotify AB | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package magnolify.parquet | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
|
||
/** | ||
* Properties for reading and writing Magnolify ParquetType classes, configurable via a Hadoop | ||
* [[Configuration]] instance. | ||
*/ | ||
object MagnolifyParquetProperties { | ||
val WriteGroupedArrays: String = "magnolify.parquet.write-grouped-arrays" | ||
val WriteGroupedArraysDefault: Boolean = false | ||
|
||
val WriteAvroSchemaToMetadata: String = "magnolify.parquet.write-avro-schema" | ||
val WriteAvroSchemaToMetadataDefault: Boolean = true | ||
|
||
val ReadTypeKey = "parquet.type.read.type" | ||
val WriteTypeKey = "parquet.type.write.type" | ||
|
||
// Hash any Configuration values that might affect schema creation to use as part of Schema cache key | ||
private[parquet] def hashValues(conf: Configuration): Int = | ||
Option(conf.get(WriteGroupedArrays)) | ||
.map(_.toBoolean) | ||
.getOrElse(WriteGroupedArraysDefault) | ||
.hashCode() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the fence about relying so heavily on the hadoop
Configuration
class, since it pulls in hadoop-common artifact and links us more tightly with Hadoop. Parquet is trying to move away fromConfiguration
and onto their own ParquetConfiguration class, which we could use instead. However, it might be confusing for Scio users since Scio is heavily dependent onConfiguration
and we don't have immediate plans to offboard from itThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I might pull this out into a separate PR. will update shortly