Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jul 9, 2024
1 parent c24b4bf commit 2c30d57
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 0 deletions.
104 changes: 104 additions & 0 deletions parquet/src/test/scala/magnolify/parquet/AvroParquetSuite2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2022 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magnolify.parquet

import cats.*
import magnolify.avro.AvroType
import magnolify.cats.auto.*
import magnolify.scalacheck.auto.*
import magnolify.test.*
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{
AvroParquetReader,
AvroParquetWriter,
AvroReadSupport,
GenericDataSupplier
}
import org.scalacheck.*

import scala.reflect.ClassTag

case class WithList2(b: List[Boolean])

class AvroParquetSuite2 extends MagnolifySuite {

private def test[T: Arbitrary: ClassTag]()(implicit
at: AvroType[T],
tpe: ParquetType[T],
eq: Eq[T]
): Unit = test[T](className[T])

private def test[T: Arbitrary](
name: String
)(implicit
at: AvroType[T],
tpe: ParquetType[T],
eq: Eq[T]
): Unit = {
// Ensure serializable even after evaluation of `schema` and `avroSchema`
tpe.schema: Unit
tpe.avroSchema: Unit
val pt = ensureSerializable(tpe)

property(s"$name.avro2parquet") {
Prop.forAll { (t: T) =>
val r = at(t)

val out = new TestOutputFile
val writer = AvroParquetWriter.builder[GenericRecord](out).withSchema(at.schema).build()
writer.write(r)
writer.close()

val in = new TestInputFile(out.getBytes)
val reader = pt.readBuilder(in).build()
val copy = reader.read()
val next = reader.read()
reader.close()
Prop.all(eq.eqv(t, copy), next == null)
}
}

property(s"$name.parquet2avro") {
Prop.forAll { (t: T) =>
val out = new TestOutputFile
val writer = pt.writeBuilder(out).build()
writer.write(t)
writer.close()

val in = new TestInputFile(out.getBytes)
val conf = new Configuration()
AvroReadSupport.setAvroDataSupplier(conf, classOf[GenericDataSupplier])
// read with AvroType schema instead of parquet writer one
AvroReadSupport.setAvroReadSchema(conf, at.schema)
try {
val reader = AvroParquetReader.builder[GenericRecord](in).withConf(conf).build()

reader.read()
reader.read()
reader.close()
// Prop.all(eq.eqv(t, at(copy)), next == null)
} catch {
case _: Exception =>
// println(e)
}
}
}
}

test[WithList2]()
}
108 changes: 108 additions & 0 deletions parquet/src/test/scala/magnolify/parquet/ParquetTypeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ import magnolify.shared.doc
import magnolify.shared.TestEnumType._
import magnolify.test.Simple._
import magnolify.test._
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.io._
import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.Types
import org.scalacheck._

import java.io.ByteArrayInputStream
Expand Down Expand Up @@ -192,6 +197,107 @@ class ParquetTypeSuite extends MagnolifySuite {
assertEquals(inner.getFields.asScala.map(_.getName).toSeq, Seq("INNERFIRST"))
}
}

test(s"AvroCompat") {
val pt = ParquetType[WithList]
// Assert that by default, Magnolify doesn't wrap repeated fields in group types
assert(!Schema.hasGroupedArray(pt.schema))
assertEquals(
"""|message magnolify.parquet.WithList {
| required binary s (STRING);
| repeated binary l (STRING);
|}
|""".stripMargin,
pt.schema.toString
)

// Construct Magnolify read support for a Parquet file written using parquet-avro
val readSupport = pt.readSupport.init(
new InitContext(
new Configuration(),
Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava,
// Build expected writer schema, which uses Avro array grouping
Types
.buildMessage()
.addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s"))
.addField(
Types
.buildGroup(Repetition.REQUIRED)
.addField(
Types
.primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED)
.named("array")
)
.named("list")
)
.named("WithList")
)
)

// Assert that Magnolify is using a compatible array schema to read
assert(Schema.hasGroupedArray(readSupport.getRequestedSchema))
assertEquals(
"""|message magnolify.parquet.WithList {
| required binary s (STRING);
| required group l (LIST) {
| repeated binary array (STRING);
| }
|}
|""".stripMargin,
readSupport.getRequestedSchema.toString
)
}

test(s"AvroCompat") {
val pt = ParquetType[WithList]

// Assert that by default, Magnolify doesn't wrap repeated fields in group types
assert(!Schema.hasGroupedArray(pt.schema))
assertEquals(
"""|message magnolify.parquet.WithList {
| required binary s (STRING);
| repeated binary l (STRING);
|}
|""".stripMargin,
pt.schema.toString
)

// Construct Magnolify read support for a Parquet file written using parquet-avro
val readSupport = pt.readSupport.init(
new InitContext(
new Configuration(),
Map(ParquetWriter.OBJECT_MODEL_NAME_PROP -> Set("avro").asJava).asJava,
// Build expected writer schema, which uses parquet-avro-style array grouping
Types
.buildMessage()
.addField(Types.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED).named("s"))
.addField(
Types
.buildGroup(Repetition.REQUIRED)
.addField(
Types
.primitive(PrimitiveTypeName.BINARY, Repetition.REPEATED)
.named("array")
)
.named("list")
)
.named("WithList")
)
)

// Assert that Magnolify is using a compatible array schema to read
assert(Schema.hasGroupedArray(readSupport.getRequestedSchema))
assertEquals(
"""|message magnolify.parquet.WithList {
| required binary s (STRING);
| required group l (LIST) {
| repeated binary array (STRING);
| }
|}
|""".stripMargin,
readSupport.getRequestedSchema.toString
)
}
}

case class Unsafe(c: Char)
Expand Down Expand Up @@ -225,6 +331,8 @@ case class ParquetNestedListDoc(
i: List[Integers]
)

case class WithList(s: String, l: List[String])

class TestInputFile(ba: Array[Byte]) extends InputFile {
private val bais = new ByteArrayInputStream(ba)
override def getLength: Long = ba.length.toLong
Expand Down

0 comments on commit 2c30d57

Please sign in to comment.