Skip to content

Commit

Permalink
Improve strict schema string match with smarter matching (#621)
Browse files Browse the repository at this point in the history
* Improve strict schema string match with smarter matching

* Add Spark conf to gate the structural schema matching
  • Loading branch information
andyl-db authored Jan 29, 2025
1 parent 3481ef2 commit aa64eda
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ object ConfUtils {
val NEVER_USE_HTTPS = "spark.delta.sharing.network.never.use.https"
val NEVER_USE_HTTPS_DEFAULT = "false"

val STRUCTURAL_SCHEMA_MATCH_CONF = "spark.delta.sharing.client.useStructuralSchemaMatch"
val STRUCTURAL_SCHEMA_MATCH_DEFAULT = "false"

def getProxyConfig(conf: Configuration): Option[ProxyConfig] = {
val proxyHost = conf.get(PROXY_HOST, null)
val proxyPortAsString = conf.get(PROXY_PORT, null)
Expand Down Expand Up @@ -286,6 +289,9 @@ object ConfUtils {
maxDur
}

def structuralSchemaMatchingEnabled(conf: SQLConf): Boolean =
conf.getConfString(STRUCTURAL_SCHEMA_MATCH_CONF, STRUCTURAL_SCHEMA_MATCH_DEFAULT).toBoolean

private def toTimeInSeconds(timeStr: String, conf: String): Int = {
val timeInSeconds = JavaUtils.timeStringAs(timeStr, TimeUnit.SECONDS)
validateNonNeg(timeInSeconds, conf)
Expand Down
21 changes: 17 additions & 4 deletions client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import io.delta.sharing.client.model.{
}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown
import io.delta.sharing.spark.util.SchemaUtils

/**
* Used to query the current state of the transaction logs of a remote shared Delta table.
Expand Down Expand Up @@ -227,11 +228,23 @@ class RemoteSnapshot(
}

private def checkSchemaNotChange(newMetadata: Metadata): Unit = {
if (newMetadata.schemaString != metadata.schemaString ||
val schemaChangedException = new SparkException(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""".stripMargin)

if (ConfUtils.structuralSchemaMatchingEnabled(spark.sessionState.conf)) {
val newSchema = DataType.fromJson(newMetadata.schemaString).asInstanceOf[StructType]
val currentSchema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]

if (
metadata.partitionColumns != newMetadata.partitionColumns ||
!SchemaUtils.isReadCompatible(currentSchema, newSchema)
) {
throw schemaChangedException
}
} else if (newMetadata.schemaString != metadata.schemaString ||
newMetadata.partitionColumns != metadata.partitionColumns) {
throw new SparkException(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""")
throw schemaChangedException
}
}

Expand Down
104 changes: 101 additions & 3 deletions spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.nio.file.Files

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference => SqlAttributeReference, EqualTo => SqlEqualTo, Literal => SqlLiteral}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{FloatType, IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, FloatType, IntegerType, LongType, StringType, StructField, StructType}

import io.delta.sharing.client.model.Table
import io.delta.sharing.client.model.{DeltaTableMetadata, Table}

class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {

Expand Down Expand Up @@ -531,4 +531,102 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {
client.clear()
checkGetMetadataCalledOnce(Some(1L), false)
}

def testMismatch(expectError: Boolean)(
getInitialSchema: StructType => StructType
): Unit = {
val client = new TestDeltaSharingClient()
client.clear()
val table = Table("fe", "fi", "fo")
val metadata = client.getMetadata(table, versionAsOf = None, timestampAsOf = None)
val schema =
DataType
.fromJson(metadata.metadata.schemaString)
.asInstanceOf[StructType]
val initialSchema = getInitialSchema(schema)
val snapshot = new RemoteSnapshot(
tablePath = new Path("test"),
client = client,
table = table,
initDeltaTableMetadata = Some(
metadata.copy(
metadata = metadata.metadata.copy(
schemaString = initialSchema.json
)
)
)
)
val fileIndex = {
val params = RemoteDeltaFileIndexParams(spark, snapshot, client.getProfileProvider)
RemoteDeltaSnapshotFileIndex(params, Some(2L))
}
if (expectError) {
val e = intercept[SparkException] {
snapshot.filesForScan(Nil, Some(2L), Some("jsonPredicate1"), fileIndex)
}
assert(
e.getMessage.contains(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""".stripMargin
)
)
} else {
snapshot.filesForScan(Nil, Some(2L), Some("jsonPredicate1"), fileIndex)
}
}

test("RemoteDeltaLog should error when the new metadata is a subset of current metadata") {
spark.sessionState.conf.setConfString(
"spark.delta.sharing.client.useStructuralSchemaMatch",
"true"
)
testMismatch(expectError = true) { schema =>
// initial schema has extra field to schema so the new metadata is a subset
StructType(
schema.fields ++ Seq(
StructField(
name = "extra_field",
dataType = StringType
)
)
)
}
}

test(
"RemoteDeltaLog should not error when new metadata includes extra columns not in new metadata"
) {
spark.sessionState.conf.setConfString(
"spark.delta.sharing.client.useStructuralSchemaMatch",
"true"
)
testMismatch(expectError = false) { schema =>
// initial schema only has one field so that the new metadata includes extra fields
StructType(
Seq(schema.fields.head)
)
}
}

test("RemoteDeltaLog errors when new metadata data type does not match") {
spark.sessionState.conf.setConfString(
"spark.delta.sharing.client.useStructuralSchemaMatch",
"true"
)
testMismatch(expectError = true) { schema =>
// initial schema only has one field so that the new metadata includes extra fields
StructType(
schema.fields.zipWithIndex.map {
case (field, i) =>
if (i == 0) {
field.copy(
dataType = FloatType
)
} else {
field
}
}
)
}
}
}

0 comments on commit aa64eda

Please sign in to comment.