Skip to content

Commit

Permalink
drop Optional
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Sep 12, 2024
1 parent db5b1b0 commit be75cc3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
21 changes: 11 additions & 10 deletions parquet/src/main/scala/magnolify/parquet/ParquetType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ object ParquetArray {
sealed trait ParquetType[T] extends Serializable {
import ParquetType._

def schema: MessageType = schema(None)
def schema(conf: Option[Configuration]): MessageType
def avroSchema: AvroSchema = avroSchema(None)
def avroSchema(conf: Option[Configuration]): AvroSchema
def schema: MessageType = schema(new Configuration())
def schema(conf: Configuration): MessageType

def avroSchema: AvroSchema = avroSchema(new Configuration())
def avroSchema(conf: Configuration): AvroSchema

def setupInput(job: Job): Unit = {
job.setInputFormatClass(classOf[ParquetInputFormat[T]])
Expand Down Expand Up @@ -98,9 +99,9 @@ object ParquetType {
)(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match {
case r: ParquetField.Record[_] =>
new ParquetType[T] {
@transient override def schema(conf: Option[Configuration]): MessageType =
Schema.message(r.schema(cm, conf.getOrElse(new Configuration())))
@transient override def avroSchema(conf: Option[Configuration]): AvroSchema = {
@transient override def schema(conf: Configuration): MessageType =
Schema.message(r.schema(cm, conf))
@transient override def avroSchema(conf: Configuration): AvroSchema = {
val s = new AvroSchemaConverter().convert(schema(conf))
// add doc to avro schema
val fieldDocs = f.fieldDocs(cm)
Expand Down Expand Up @@ -142,7 +143,7 @@ object ParquetType {
}

val requestedSchema = {
val s = Schema.message(parquetType.schema(Some(context.getConfiguration))): @nowarn(
val s = Schema.message(parquetType.schema(context.getConfiguration)): @nowarn(
"cat=deprecation"
)
// If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility;
Expand Down Expand Up @@ -184,7 +185,7 @@ object ParquetType {
parquetType = SerializationUtils.fromBase64[ParquetType[T]](configuration.get(WriteTypeKey))
}

val schema = Schema.message(parquetType.schema(Some(configuration)))
val schema = Schema.message(parquetType.schema(configuration))
val metadata = new java.util.HashMap[String, String]()

if (

Check warning on line 191 in parquet/src/main/scala/magnolify/parquet/ParquetType.scala

View check run for this annotation

Codecov / codecov/patch

parquet/src/main/scala/magnolify/parquet/ParquetType.scala#L191

Added line #L191 was not covered by tests
Expand All @@ -196,7 +197,7 @@ object ParquetType {
try {
metadata.put(
AVRO_SCHEMA_METADATA_KEY,
parquetType.avroSchema(Some(configuration)).toString()
parquetType.avroSchema(configuration).toString()
)
} catch {
// parquet-avro has greater schema restrictions than magnolify-parquet, e.g., parquet-avro does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class ParquetTypeSuite extends MagnolifySuite {
conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, true)
val writeContext = pt.writeSupport.init(conf)
assertEquals(avroCompliantSchema, writeContext.getSchema.toString)
assertEquals(pt.schema(Some(conf)), writeContext.getSchema)
assertEquals(pt.schema(conf), writeContext.getSchema)
}

// Assert that WriteSupport uses non-grouped schema otherwise
Expand All @@ -247,7 +247,7 @@ class ParquetTypeSuite extends MagnolifySuite {
conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, false)
val writeContext = pt.writeSupport.init(conf)
assertEquals(nonAvroCompliantSchema, writeContext.getSchema.toString)
assertEquals(pt.schema(Some(conf)), writeContext.getSchema)
assertEquals(pt.schema(conf), writeContext.getSchema)
}
}
}
Expand Down

0 comments on commit be75cc3

Please sign in to comment.