Skip to content

Commit e4e5bf0

Browse files
wombatu-kunVova Kolmakov
and
Vova Kolmakov
authored
[HUDI-8097] Fix getting Schema evolution setting from hudi-defaults.conf on altering columns (apache#11796)
Co-authored-by: Vova Kolmakov <[email protected]>
1 parent db5c2d9 commit e4e5bf0

File tree

8 files changed

+20
-30
lines changed

8 files changed

+20
-30
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import org.apache.hudi.io.storage.HoodieSparkIOFactory
4848
import org.apache.hudi.metadata.HoodieTableMetadata
4949
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
5050
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
51-
5251
import org.apache.avro.Schema
5352
import org.apache.avro.generic.GenericRecord
5453
import org.apache.hadoop.conf.Configuration
@@ -65,7 +64,7 @@ import org.apache.spark.sql.execution.FileRelation
6564
import org.apache.spark.sql.execution.datasources._
6665
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
6766
import org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat, ParquetFileFormat}
68-
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
67+
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
6968
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
7069
import org.apache.spark.sql.types.StructType
7170
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
@@ -877,7 +876,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
877876
// t/h Spark Session configuration (for ex, for Spark SQL)
878877
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
879878
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
880-
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
881-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
879+
ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)
882880
}
883881
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
2121
import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
2222
import org.apache.hudi.DataSourceWriteOptions._
2323
import org.apache.hudi.HoodieConversionUtils.toProperties
24-
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties}
24+
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties}
2525
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, WriteOperationType}
2626
import org.apache.hudi.common.table.HoodieTableConfig
2727
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
@@ -48,7 +48,6 @@ import org.apache.spark.sql.types.StructType
4848
import org.slf4j.LoggerFactory
4949

5050
import java.util.Locale
51-
5251
import scala.collection.JavaConverters._
5352

5453
trait ProvidesHoodieConfig extends Logging {
@@ -573,6 +572,12 @@ object ProvidesHoodieConfig {
573572
}
574573
}
575574

575+
def isSchemaEvolutionEnabled(sparkSession: SparkSession): Boolean =
576+
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
577+
DFSPropertiesConfiguration.getGlobalProps.getString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
578+
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString)
579+
).toBoolean
580+
576581
private def filterNullValues(opts: Map[String, String]): Map[String, String] =
577582
opts.filter { case (_, v) => v != null }
578583

hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf

+2
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@
2121
# Example:
2222
hoodie.datasource.write.table.type MERGE_ON_READ
2323
hoodie.datasource.write.hive_style_partitioning false
24+
hoodie.schema.on.read.enable true
25+

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
2424
import org.apache.hudi.common.testutils.HoodieTestUtils
2525
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
2626
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
27-
2827
import org.scalatest.BeforeAndAfter
2928

3029
import java.io.File
@@ -99,6 +98,10 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
9998
spark.sql(s"delete from $tableName where year = $partitionVal")
10099
val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()
101100
assertResult(0)(cnt)
101+
102+
// check that schema evolution is enabled (from hudi-defaults.conf),
103+
// so no exception is thrown on alter table change column type
104+
spark.sql(s"alter table $tableName change column price price string")
102105
}
103106
}
104107

hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieException
2828
import org.apache.hudi.hadoop.fs.HadoopFSUtils
2929
import org.apache.hudi.sql.InsertMode
3030
import org.apache.hudi.storage.StoragePath
31-
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, SparkAdapterSupport}
32-
31+
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
3332
import org.apache.hadoop.fs.Path
3433
import org.apache.spark.sql.HoodieSpark3CatalogUtils.MatchBucketTransform
3534
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -135,8 +134,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
135134
catalogTable = Some(catalogTable),
136135
tableIdentifier = Some(ident.toString))
137136

138-
val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
139-
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
137+
val schemaEvolutionEnabled = ProvidesHoodieConfig.isSchemaEvolutionEnabled(spark)
140138

141139
// NOTE: PLEASE READ CAREFULLY
142140
//

hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/hudi/Spark33ResolveHudiAlterTableCommand.scala

+1-7
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi
1919

20-
import org.apache.hudi.common.config.HoodieCommonConfig
2120
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
22-
2321
import org.apache.spark.sql.SparkSession
2422
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
2523
import org.apache.spark.sql.catalyst.plans.logical._
@@ -34,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom
3432
class Spark33ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3533

3634
def apply(plan: LogicalPlan): LogicalPlan = {
37-
if (schemaEvolutionEnabled) {
35+
if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
3836
plan.resolveOperatorsUp {
3937
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if set.resolved =>
4038
HudiAlterTableCommand(t.v1Table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
@@ -56,10 +54,6 @@ class Spark33ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Ru
5654
}
5755
}
5856

59-
private def schemaEvolutionEnabled: Boolean =
60-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
61-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
62-
6357
object ResolvedHoodieV2TablePlan {
6458
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
6559
plan match {

hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/hudi/Spark34ResolveHudiAlterTableCommand.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.hudi
1919

20-
import org.apache.hudi.common.config.HoodieCommonConfig
2120
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
2221
import org.apache.spark.sql.SparkSession
2322
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom
3332
class Spark34ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3433

3534
def apply(plan: LogicalPlan): LogicalPlan = {
36-
if (schemaEvolutionEnabled) {
35+
if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
3736
plan.resolveOperatorsUp {
3837
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if set.resolved =>
3938
HudiAlterTableCommand(t.v1Table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
@@ -55,10 +54,6 @@ class Spark34ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Ru
5554
}
5655
}
5756

58-
private def schemaEvolutionEnabled: Boolean =
59-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
60-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
61-
6257
object ResolvedHoodieV2TablePlan {
6358
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
6459
plan match {

hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/Spark35ResolveHudiAlterTableCommand.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.hudi
1919

20-
import org.apache.hudi.common.config.HoodieCommonConfig
2120
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
2221
import org.apache.spark.sql.SparkSession
2322
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom
3332
class Spark35ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3433

3534
def apply(plan: LogicalPlan): LogicalPlan = {
36-
if (schemaEvolutionEnabled) {
35+
if (ProvidesHoodieConfig.isSchemaEvolutionEnabled(sparkSession)) {
3736
plan.resolveOperatorsUp {
3837
case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if set.resolved =>
3938
HudiAlterTableCommand(t.v1Table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
@@ -55,10 +54,6 @@ class Spark35ResolveHudiAlterTableCommand(sparkSession: SparkSession) extends Ru
5554
}
5655
}
5756

58-
private def schemaEvolutionEnabled: Boolean =
59-
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
60-
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
61-
6257
object ResolvedHoodieV2TablePlan {
6358
def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
6459
plan match {

0 commit comments

Comments
 (0)