From aa64eda86e30a9034421c363c6e0aa61a40fc53b Mon Sep 17 00:00:00 2001 From: andyl-db <113815940+andyl-db@users.noreply.github.com> Date: Wed, 29 Jan 2025 15:38:45 -0800 Subject: [PATCH] Improve strict schema string match with smarter matching (#621) * Improve strict schema string match with smarter matching * Add Spark conf to gate the structural schema matching --- .../delta/sharing/client/util/ConfUtils.scala | 6 + .../delta/sharing/spark/RemoteDeltaLog.scala | 21 +++- .../sharing/spark/RemoteDeltaLogSuite.scala | 104 +++++++++++++++++- 3 files changed, 124 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index fcff6e580..8a3032241 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -95,6 +95,9 @@ object ConfUtils { val NEVER_USE_HTTPS = "spark.delta.sharing.network.never.use.https" val NEVER_USE_HTTPS_DEFAULT = "false" + val STRUCTURAL_SCHEMA_MATCH_CONF = "spark.delta.sharing.client.useStructuralSchemaMatch" + val STRUCTURAL_SCHEMA_MATCH_DEFAULT = "false" + def getProxyConfig(conf: Configuration): Option[ProxyConfig] = { val proxyHost = conf.get(PROXY_HOST, null) val proxyPortAsString = conf.get(PROXY_PORT, null) @@ -286,6 +289,9 @@ object ConfUtils { maxDur } + def structuralSchemaMatchingEnabled(conf: SQLConf): Boolean = + conf.getConfString(STRUCTURAL_SCHEMA_MATCH_CONF, STRUCTURAL_SCHEMA_MATCH_DEFAULT).toBoolean + private def toTimeInSeconds(timeStr: String, conf: String): Int = { val timeInSeconds = JavaUtils.timeStringAs(timeStr, TimeUnit.SECONDS) validateNonNeg(timeInSeconds, conf) diff --git a/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala b/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala index 753023682..f2146a382 100644 --- a/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala +++ b/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala @@ -51,6 +51,7 @@ import io.delta.sharing.client.model.{ } import io.delta.sharing.client.util.ConfUtils import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown +import io.delta.sharing.spark.util.SchemaUtils /** * Used to query the current state of the transaction logs of a remote shared Delta table. @@ -227,11 +228,23 @@ class RemoteSnapshot( } private def checkSchemaNotChange(newMetadata: Metadata): Unit = { - if (newMetadata.schemaString != metadata.schemaString || + val schemaChangedException = new SparkException( + s"""The schema or partition columns of your Delta table has changed since your + |DataFrame was created. Please redefine your DataFrame""".stripMargin) + + if (ConfUtils.structuralSchemaMatchingEnabled(spark.sessionState.conf)) { + val newSchema = DataType.fromJson(newMetadata.schemaString).asInstanceOf[StructType] + val currentSchema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType] + + if ( + metadata.partitionColumns != newMetadata.partitionColumns || + !SchemaUtils.isReadCompatible(currentSchema, newSchema) + ) { + throw schemaChangedException + } + } else if (newMetadata.schemaString != metadata.schemaString || newMetadata.partitionColumns != metadata.partitionColumns) { - throw new SparkException( - s"""The schema or partition columns of your Delta table has changed since your - |DataFrame was created. Please redefine your DataFrame""") + throw schemaChangedException } } diff --git a/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala index 033af78f9..29877cc27 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala @@ -21,14 +21,14 @@ import java.nio.file.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference => SqlAttributeReference, EqualTo => SqlEqualTo, Literal => SqlLiteral} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{FloatType, IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, FloatType, IntegerType, LongType, StringType, StructField, StructType} -import io.delta.sharing.client.model.Table +import io.delta.sharing.client.model.{DeltaTableMetadata, Table} class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession { @@ -531,4 +531,102 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession { client.clear() checkGetMetadataCalledOnce(Some(1L), false) } + + def testMismatch(expectError: Boolean)( + getInitialSchema: StructType => StructType + ): Unit = { + val client = new TestDeltaSharingClient() + client.clear() + val table = Table("fe", "fi", "fo") + val metadata = client.getMetadata(table, versionAsOf = None, timestampAsOf = None) + val schema = + DataType + .fromJson(metadata.metadata.schemaString) + .asInstanceOf[StructType] + val initialSchema = getInitialSchema(schema) + val snapshot = new RemoteSnapshot( + tablePath = new Path("test"), + client = client, + table = table, + initDeltaTableMetadata = Some( + metadata.copy( + metadata = metadata.metadata.copy( + schemaString = initialSchema.json + ) + ) + ) + ) + val fileIndex = { + val params = RemoteDeltaFileIndexParams(spark, snapshot, client.getProfileProvider) + RemoteDeltaSnapshotFileIndex(params, Some(2L)) + } + if (expectError) { + val e = intercept[SparkException] { + snapshot.filesForScan(Nil, Some(2L), Some("jsonPredicate1"), fileIndex) + } + assert( + e.getMessage.contains( + s"""The schema or partition columns of your Delta table has changed since your + |DataFrame was created. Please redefine your DataFrame""".stripMargin + ) + ) + } else { + snapshot.filesForScan(Nil, Some(2L), Some("jsonPredicate1"), fileIndex) + } + } + + test("RemoteDeltaLog should error when the new metadata is a subset of current metadata") { + spark.sessionState.conf.setConfString( + "spark.delta.sharing.client.useStructuralSchemaMatch", + "true" + ) + testMismatch(expectError = true) { schema => + // initial schema has extra field to schema so the new metadata is a subset + StructType( + schema.fields ++ Seq( + StructField( + name = "extra_field", + dataType = StringType + ) + ) + ) + } + } + + test( + "RemoteDeltaLog should not error when new metadata includes extra columns not in new metadata" + ) { + spark.sessionState.conf.setConfString( + "spark.delta.sharing.client.useStructuralSchemaMatch", + "true" + ) + testMismatch(expectError = false) { schema => + // initial schema only has one field so that the new metadata includes extra fields + StructType( + Seq(schema.fields.head) + ) + } + } + + test("RemoteDeltaLog errors when new metadata data type does not match") { + spark.sessionState.conf.setConfString( + "spark.delta.sharing.client.useStructuralSchemaMatch", + "true" + ) + testMismatch(expectError = true) { schema => + // initial schema only has one field so that the new metadata includes extra fields + StructType( + schema.fields.zipWithIndex.map { + case (field, i) => + if (i == 0) { + field.copy( + dataType = FloatType + ) + } else { + field + } + } + ) + } + } }