Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Reject unsupported type changes with Uniform #3947

Merged
merged 6 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Introduce TypeWideningMode + address other comments
  • Loading branch information
johanl-db committed Dec 12, 2024
commit 8cc235ec7223efc89038ca6c0440bb70dacd7771
84 changes: 46 additions & 38 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ class DeltaAnalysis(session: SparkSession)
if (i < targetAttrs.length) {
val targetAttr = targetAttrs(i)
addCastToColumn(attr, targetAttr, deltaTable.name(),
shouldWidenType = shouldWidenType(deltaTable, writeOptions)
typeWideningMode = getTypeWideningMode(deltaTable, writeOptions)
)
} else {
attr
Expand Down Expand Up @@ -913,7 +913,7 @@ class DeltaAnalysis(session: SparkSession)
throw DeltaErrors.missingColumn(attr, targetAttrs)
}
addCastToColumn(attr, targetAttr, deltaTable.name(),
shouldWidenType = shouldWidenType(deltaTable, writeOptions)
typeWideningMode = getTypeWideningMode(deltaTable, writeOptions)
)
}
Project(project, query)
Expand All @@ -923,24 +923,25 @@ class DeltaAnalysis(session: SparkSession)
attr: NamedExpression,
targetAttr: NamedExpression,
tblName: String,
shouldWidenType: (AtomicType, AtomicType) => Boolean): NamedExpression = {
typeWideningMode: TypeWideningMode): NamedExpression = {
val expr = (attr.dataType, targetAttr.dataType) match {
case (s, t) if s == t =>
attr
case (s: StructType, t: StructType) if s != t =>
addCastsToStructs(tblName, attr, s, t, shouldWidenType)
addCastsToStructs(tblName, attr, s, t, typeWideningMode)
case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType, tNull: Boolean))
if s != t && sNull == tNull =>
addCastsToArrayStructs(tblName, attr, s, t, sNull, shouldWidenType)
case (s: AtomicType, t: AtomicType) if shouldWidenType(t, s) =>
if s != t && sNull == tNull =>
addCastsToArrayStructs(tblName, attr, s, t, sNull, typeWideningMode)
case (s: AtomicType, t: AtomicType)
if typeWideningMode.shouldWidenType(fromType = t, toType = s) =>
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
case (s: MapType, t: MapType)
if !DataType.equalsStructurally(s, t, ignoreNullability = true) =>
// only trigger addCastsToMaps if exists differences like extra fields, renaming or type
// differences.
addCastsToMaps(tblName, attr, s, t, shouldWidenType)
addCastsToMaps(tblName, attr, s, t, typeWideningMode)
case _ =>
getCastFunction(attr, targetAttr.dataType, targetAttr.name)
}
Expand All @@ -953,16 +954,20 @@ class DeltaAnalysis(session: SparkSession)
* to `toType` before ingestion and values are written using their origin `toType` type.
* Otherwise, the table type `fromType` is retained and values are downcasted on write.
*/
private def shouldWidenType(
private def getTypeWideningMode(
deltaTable: DeltaTableV2,
writeOptions: Map[String, String]): (AtomicType, AtomicType) => Boolean = {
writeOptions: Map[String, String]): TypeWideningMode = {
val options = new DeltaOptions(deltaTable.options ++ writeOptions, conf)
Copy link
Collaborator Author

@johanl-db johanl-db Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug present in the initial allowTypeWidening method: the mergeSchema option value wasn't correctly taken into account

val snapshot = deltaTable.initialSnapshot
val typeWideningEnabled = TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata)
val schemaEvolutionEnabled = options.canMergeSchema

typeWideningEnabled && schemaEvolutionEnabled &&
TypeWidening.isTypeChangeSupportedForSchemaEvolution(_, _, snapshot.metadata)
if (typeWideningEnabled && schemaEvolutionEnabled) {
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(snapshot.metadata))
} else {
TypeWideningMode.NoTypeWidening
}
}

/**
Expand All @@ -984,7 +989,7 @@ class DeltaAnalysis(session: SparkSession)
val existingSchemaOutput = output.take(schema.length)
existingSchemaOutput.map(_.name) != schema.map(_.name) ||
!SchemaUtils.isReadCompatible(schema.asNullable, existingSchemaOutput.toStructType,
shouldWidenType = shouldWidenType(deltaTable, writeOptions))
typeWideningMode = getTypeWideningMode(deltaTable, writeOptions))
}

/**
Expand Down Expand Up @@ -1043,7 +1048,7 @@ class DeltaAnalysis(session: SparkSession)
!SchemaUtils.isReadCompatible(
specifiedTargetAttrs.toStructType.asNullable,
query.output.toStructType,
shouldWidenType = shouldWidenType(deltaTable, writeOptions)
typeWideningMode = getTypeWideningMode(deltaTable, writeOptions)
)
}

Expand Down Expand Up @@ -1074,44 +1079,47 @@ class DeltaAnalysis(session: SparkSession)
parent: NamedExpression,
source: StructType,
target: StructType,
shouldWidenType: (AtomicType, AtomicType) => Boolean): NamedExpression = {
typeWideningMode: TypeWideningMode): NamedExpression = {
if (source.length < target.length) {
throw DeltaErrors.notEnoughColumnsInInsert(
tableName, source.length, target.length, Some(parent.qualifiedName))
}
// Extracts the field at a given index in the target schema. Only matches if the index is valid.
object TargetIndex {
def unapply(index: Int): Option[StructField] = target.lift(index)
}

val fields = source.zipWithIndex.map {
case (StructField(name, nested: StructType, _, metadata), i) if i < target.length =>
target(i).dataType match {
case (StructField(name, nested: StructType, _, metadata), i @ TargetIndex(targetField)) =>
targetField.dataType match {
case t: StructType =>
val subField = Alias(GetStructField(parent, i, Option(name)), target(i).name)(
val subField = Alias(GetStructField(parent, i, Option(name)), targetField.name)(
explicitMetadata = Option(metadata))
addCastsToStructs(tableName, subField, nested, t, shouldWidenType)
addCastsToStructs(tableName, subField, nested, t, typeWideningMode)
case o =>
val field = parent.qualifiedName + "." + name
val targetName = parent.qualifiedName + "." + target(i).name
val targetName = parent.qualifiedName + "." + targetField.name
throw DeltaErrors.cannotInsertIntoColumn(tableName, field, targetName, o.simpleString)
}

case (StructField(name, dt: AtomicType, _, _), i)
if i < target.length && target(i).dataType.isInstanceOf[AtomicType] &&
shouldWidenType(target(i).dataType.asInstanceOf[AtomicType], dt) =>
val targetAttr = target(i)
case (StructField(name, sourceType: AtomicType, _, _),
i @ TargetIndex(StructField(targetName, targetType: AtomicType, _, targetMetadata)))
if typeWideningMode.shouldWidenType(fromType = targetType, toType = sourceType) =>
Alias(
GetStructField(parent, i, Option(name)),
targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))
case (other, i) if i < target.length =>
val targetAttr = target(i)
targetName)(explicitMetadata = Option(targetMetadata))
case (sourceField, i @ TargetIndex(targetField)) =>
Alias(
getCastFunction(GetStructField(parent, i, Option(other.name)),
targetAttr.dataType, targetAttr.name),
targetAttr.name)(explicitMetadata = Option(targetAttr.metadata))
getCastFunction(GetStructField(parent, i, Option(sourceField.name)),
targetField.dataType, targetField.name),
targetField.name)(explicitMetadata = Option(targetField.metadata))

case (other, i) =>
case (sourceField, i) =>
// This is a new column, so leave to schema evolution as is. Do not lose it's name so
// wrap with an alias
Alias(
GetStructField(parent, i, Option(other.name)),
other.name)(explicitMetadata = Option(other.metadata))
GetStructField(parent, i, Option(sourceField.name)),
sourceField.name)(explicitMetadata = Option(sourceField.metadata))
}
Alias(CreateStruct(fields), parent.name)(
parent.exprId, parent.qualifier, Option(parent.metadata))
Expand All @@ -1123,10 +1131,10 @@ class DeltaAnalysis(session: SparkSession)
source: StructType,
target: StructType,
sourceNullable: Boolean,
shouldWidenType: (AtomicType, AtomicType) => Boolean): Expression = {
typeWideningMode: TypeWideningMode): Expression = {
val structConverter: (Expression, Expression) => Expression = (_, i) =>
addCastsToStructs(
tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target, shouldWidenType)
tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target, typeWideningMode)
val transformLambdaFunc = {
val elementVar = NamedLambdaVariable("elementVar", source, sourceNullable)
val indexVar = NamedLambdaVariable("indexVar", IntegerType, false)
Expand All @@ -1151,7 +1159,7 @@ class DeltaAnalysis(session: SparkSession)
parent: NamedExpression,
sourceMapType: MapType,
targetMapType: MapType,
shouldWidenType: (AtomicType, AtomicType) => Boolean): Expression = {
typeWideningMode: TypeWideningMode): Expression = {
val transformedKeys =
if (sourceMapType.keyType != targetMapType.keyType) {
// Create a transformation for the keys
Expand All @@ -1167,7 +1175,7 @@ class DeltaAnalysis(session: SparkSession)
key,
keyAttr,
tableName,
shouldWidenType
typeWideningMode
)
LambdaFunction(castedKey, Seq(key))
})
Expand All @@ -1190,7 +1198,7 @@ class DeltaAnalysis(session: SparkSession)
value,
valueAttr,
tableName,
shouldWidenType
typeWideningMode
)
LambdaFunction(castedValue, Seq(value))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,8 @@ object CheckDeletionVectorDisabled extends IcebergCompatCheck {
*/
object CheckTypeWideningSupported extends IcebergCompatCheck {
override def apply(context: IcebergCompatContext): Unit = {
val skipCheck = context.spark.conf
.get(DeltaSQLConf.DELTA_TYPE_WIDENING_ALLOW_UNSUPPORTED_ICEBERG_TYPE_CHANGES.key)
.toBoolean
val skipCheck = context.spark.sessionState.conf
.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_ALLOW_UNSUPPORTED_ICEBERG_TYPE_CHANGES)

if (skipCheck || !TypeWidening.isSupported(context.newestProtocol)) return

Expand All @@ -489,7 +488,7 @@ object CheckTypeWideningSupported extends IcebergCompatCheck {
!TypeWidening.isTypeChangeSupportedByIceberg(fromType, toType) =>
throw DeltaErrors.icebergCompatUnsupportedTypeWideningException(
context.version, fieldPath, fromType, toType)
case _ =>
case _ => () // ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ object ResolveDeltaMergeInto {

val migrationSchema = filterSchema(source.schema, Seq.empty)

val fileIndexOpt = target.collectFirst { case DeltaTable(fileIndex) => fileIndex }
def shouldWidenType(from: AtomicType, to: AtomicType): Boolean =
fileIndexOpt.exists { index =>
TypeWidening.isEnabled(index.protocol, index.metadata) &&
TypeWidening.isTypeChangeSupportedForSchemaEvolution(from, to, index.metadata)
}
val typeWideningMode =
target.collectFirst {
case DeltaTable(index) if TypeWidening.isEnabled(index.protocol, index.metadata) =>
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(index.metadata))
}.getOrElse(TypeWideningMode.NoTypeWidening)

// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
Expand All @@ -307,7 +308,7 @@ object ResolveDeltaMergeInto {
target.schema,
migrationSchema,
allowImplicitConversions = true,
shouldWidenType = shouldWidenType
typeWideningMode = typeWideningMode
)
} else {
target.schema
Expand Down
21 changes: 8 additions & 13 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object TypeWidening {
* It is the responsibility of the caller to recurse into structs, maps and arrays.
*/
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWideningShims.isTypeChangeSupported(fromType, toType)
TypeWideningShims.isTypeChangeSupported(fromType = fromType, toType = toType)

/**
* Returns whether the given type change can be applied during schema evolution. Only a
Expand All @@ -78,18 +78,13 @@ object TypeWidening {
fromType: AtomicType,
toType: AtomicType,
uniformIcebergEnabled: Boolean): Boolean =
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(fromType, toType) &&
(!uniformIcebergEnabled || isTypeChangeSupportedByIceberg(fromType, toType))

/**
* Alias for the above method extracting `uniformIcebergEnabled` value from the table metadata.
*/
def isTypeChangeSupportedForSchemaEvolution(
fromType: AtomicType,
toType: AtomicType,
metadata: Metadata): Boolean =
isTypeChangeSupportedForSchemaEvolution(
fromType, toType, uniformIcebergEnabled = UniversalFormat.icebergEnabled(metadata))
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType,
toType = toType
) && (
!uniformIcebergEnabled ||
isTypeChangeSupportedByIceberg(fromType = fromType, toType = toType)
)

/**
* Returns whether the given type change is supported by Iceberg, and by extension can be read
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.types.AtomicType

/**
* A type widening mode captures a specific set of type changes that are allowed to be applied.
* Currently:
* - NoTypeWidening: No type change is allowed.
* - TypeEvolution(uniformIcebergEnabled = true): Type changes that are eligible to be applied
* automatically during schema evolution and that are supported by Iceberg are allowed.
* - TypeEvolution(uniformIcebergEnabled = false): Type changes that are eligible to be applied
* automatically during schema evolution are allowed, even if they are not supported by Iceberg.
*/
trait TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait TypeWideningMode {
sealed trait TypeWideningMode {

I think it makes sense to enforce exhaustiveness here. There shouldn't be other places that need to specify customs rules, I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean
}

object TypeWideningMode {
/**
* No type change allowed. Typically because type widening and/or schema evolution isn't enabled.
*/
object NoTypeWidening extends TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
object NoTypeWidening extends TypeWideningMode {
case object NoTypeWidening extends TypeWideningMode {

Just so it looks like a nice enum-like thingy ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg.
*/
case class TypeEvolution(uniformIcebergEnabled: Boolean) extends TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call this uniformIcebergCompatibleOnly instead? That way we can also override later in case people want this anyway without having a weird mental mismatch where uniformIcebergEnabled=false on tables where it actually is enabled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, that fits better

override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType, toType = toType, uniformIcebergEnabled)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,17 @@ object ImplicitMetadataOperation {
} else {
checkDependentExpressions(spark, txn.protocol, txn.metadata, dataSchema)

def shouldWidenType(from: AtomicType, to: AtomicType): Boolean =
TypeWidening.isEnabled(txn.protocol, txn.metadata) &&
TypeWidening.isTypeChangeSupportedForSchemaEvolution(from, to, txn.metadata)
val typeWideningMode = if (TypeWidening.isEnabled(txn.protocol, txn.metadata)) {
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(txn.metadata))
} else {
TypeWideningMode.NoTypeWidening
}

SchemaMergingUtils.mergeSchemas(
txn.metadata.schema,
dataSchema,
shouldWidenType = shouldWidenType)
typeWideningMode = typeWideningMode)
}
}

Expand Down
Loading
Loading