-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Reject unsupported type changes with Uniform #3947
[Spark] Reject unsupported type changes with Uniform #3947
Conversation
3862f92
to
6b2a0ee
Compare
private def shouldWidenType( | ||
deltaTable: DeltaTableV2, | ||
writeOptions: Map[String, String]): (AtomicType, AtomicType) => Boolean = { | ||
val options = new DeltaOptions(deltaTable.options ++ writeOptions, conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a bug present in the initial allowTypeWidening method: the mergeSchema option value wasn't correctly taken into account
TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update | ||
// If type widening is enabled and the type can be widened, it takes precedence over | ||
// keepExistingType. | ||
case (current: AtomicType, update: AtomicType) if shouldWidenType(current, update) => update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of the code changes in this PR revolve around this line: whether to widen types isn't an on/off decision anymore but depends on other factors - e.p. whether uniform is enabled.
Rather than a one-off solution of passing an extra uniformEnabled parameter, I let the caller instruct which type changes should be eligible for widening. That prevents pushing more complexity into mergeDataTypes
spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningUniformTests.scala
Outdated
Show resolved
Hide resolved
version.toString, | ||
version.toString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That got me at first too: version is used twice in the error message: once in the main error DELTA_ICEBERG_COMPAT_VIOLATION and once in the sub error class UNSUPPORTED_TYPE_WIDENING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, thank you. Couple more small things and it's good to go.
* Returns a mapping of (fromType, toType) to Boolean indicating whether `fromType` is eligible to | ||
* be automatically widened to `toType` when ingesting data. If it is, the table schema is updated | ||
* to `toType` before ingestion and values are written using their origin `toType` type. | ||
* Otherwise, the table type `fromType` is retained and values are downcasted on write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update this comment now that it actually returns a mode instance, not a function anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* - TypeEvolution(uniformIcebergEnabled = false): Type changes that are eligible to be applied | ||
* automatically during schema evolution are allowed, even if they are not supported by Iceberg. | ||
*/ | ||
trait TypeWideningMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait TypeWideningMode { | |
sealed trait TypeWideningMode { |
I think it makes sense to enforce exhaustiveness here. There shouldn't be other places that need to specify customs rules, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
/** | ||
* No type change allowed. Typically because type widening and/or schema evolution isn't enabled. | ||
*/ | ||
object NoTypeWidening extends TypeWideningMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object NoTypeWidening extends TypeWideningMode { | |
case object NoTypeWidening extends TypeWideningMode { |
Just so it looks like a nice enum-like thingy ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Type changes that are eligible to be applied automatically during schema evolution are allowed. | ||
* Can be restricted to only type changes supported by Iceberg. | ||
*/ | ||
case class TypeEvolution(uniformIcebergEnabled: Boolean) extends TypeWideningMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call this uniformIcebergCompatibleOnly
instead? That way we can also override later in case people want this anyway without having a weird mental mismatch where uniformIcebergEnabled=false
on tables where it actually is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that fits better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Iceberg only supports a subset of the type changes that Delta's type widening feature covers.
See https://iceberg.apache.org/spec/#schema-evolution.
Unsupported type changes areL
This change makes using Uniform with Iceberg compatibility mutually exclusive with using one of these unsupported type changes, e.p.:
How was this patch tested?
TypeWideningUniformSuite
covering all cases listed above. E.p. for schema evolution, covers MERGE and a sample of INSERTs, including streaming writes.Does this PR introduce any user-facing changes?
Note: all the unsupported Iceberg type changes covered here will only be supported in Delta starting with Delta 4.0, so this doesn't impact any released Delta version except for 4.0 preview.
Applying a type change not supported by Iceberg when Uniform with Iceberg compatibility now fails with:
Enabling Uniform with Iceberg compatibility on a table that had an unsupported type change applied in the past now fails with:
Type changes not supported by Iceberg aren't eligible anymore for automatic type widening with schema evolution in MERGE/INSERT operation when Uniform with Iceberg compatibility is enabled.