Skip to content

Commit b420850

Browse files
[HUDI-7237] Hudi Streamer: Handle edge case with null schema, minor cleanups (apache#10342)
1 parent f931c12 commit b420850

File tree

6 files changed

+139
-76
lines changed

6 files changed

+139
-76
lines changed

hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema tar
144144
return sourceSchema;
145145
}
146146

147-
if (sourceSchema.getType() == Schema.Type.NULL || sourceSchema.getFields().isEmpty()) {
147+
if (sourceSchema == null || sourceSchema.getType() == Schema.Type.NULL || sourceSchema.getFields().isEmpty()) {
148148
return targetSchema;
149149
}
150150

hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818

1919
package org.apache.hudi.utilities.schema;
2020

21-
import org.apache.avro.Schema;
2221
import org.apache.hudi.common.util.Option;
2322

23+
import org.apache.avro.Schema;
24+
2425
/**
2526
* A schema provider which applies schema post process hook on schema.
2627
*/
@@ -38,14 +39,16 @@ public SchemaProviderWithPostProcessor(SchemaProvider schemaProvider,
3839

3940
@Override
4041
public Schema getSourceSchema() {
41-
return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getSourceSchema()))
42-
.orElse(schemaProvider.getSourceSchema());
42+
Schema sourceSchema = schemaProvider.getSourceSchema();
43+
return schemaPostProcessor.map(processor -> processor.processSchema(sourceSchema))
44+
.orElse(sourceSchema);
4345
}
4446

4547
@Override
4648
public Schema getTargetSchema() {
47-
return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getTargetSchema()))
48-
.orElse(schemaProvider.getTargetSchema());
49+
Schema targetSchema = schemaProvider.getTargetSchema();
50+
return schemaPostProcessor.map(processor -> processor.processSchema(targetSchema))
51+
.orElse(targetSchema);
4952
}
5053

5154
public SchemaProvider getOriginalSchemaProvider() {

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,16 @@ public SchemaProvider getSchemaProvider() {
5555
if (batch.isPresent() && schemaProvider == null) {
5656
throw new HoodieException("Please provide a valid schema provider class!");
5757
}
58-
return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
58+
return Option.ofNullable(schemaProvider).orElseGet(NullSchemaProvider::getInstance);
5959
}
6060

6161
public static class NullSchemaProvider extends SchemaProvider {
62+
private static final NullSchemaProvider INSTANCE = new NullSchemaProvider();
63+
public static NullSchemaProvider getInstance() {
64+
return INSTANCE;
65+
}
6266

63-
public NullSchemaProvider() {
67+
private NullSchemaProvider() {
6468
this(null, null);
6569
}
6670

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java

+46-43
Original file line numberDiff line numberDiff line change
@@ -274,18 +274,16 @@ public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaPr
274274
this.processedSchema = new SchemaSet();
275275
this.autoGenerateRecordKeys = KeyGenUtils.enableAutoGenerateRecordKeys(props);
276276
this.keyGenClassName = getKeyGeneratorClassName(new TypedProperties(props));
277-
refreshTimeline();
278-
// Register User Provided schema first
279-
registerAvroSchemas(schemaProvider);
280-
281-
282-
this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, getHoodieClientConfig(this.schemaProvider));
283-
this.hoodieMetrics = new HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
284277
this.conf = conf;
278+
279+
HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig();
280+
this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, hoodieWriteConfig);
281+
this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig);
285282
if (props.getBoolean(ERROR_TABLE_ENABLED.key(), ERROR_TABLE_ENABLED.defaultValue())) {
286283
this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg, sparkSession, props, hoodieSparkContext, fs);
287284
this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props);
288285
}
286+
refreshTimeline();
289287
Source source = UtilHelpers.createSource(cfg.sourceClassName, props, hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
290288
this.formatAdapter = new SourceFormatAdapter(source, this.errorTableWriter, Option.of(props));
291289

@@ -309,7 +307,7 @@ public void refreshTimeline() throws IOException {
309307
if (fs.exists(new Path(cfg.targetBasePath))) {
310308
try {
311309
HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
312-
.setConf(new Configuration(fs.getConf()))
310+
.setConf(conf)
313311
.setBasePath(cfg.targetBasePath)
314312
.setPayloadClassName(cfg.payloadClassName)
315313
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
@@ -337,7 +335,7 @@ public void refreshTimeline() throws IOException {
337335
LOG.warn("Base path exists, but table is not fully initialized. Re-initializing again");
338336
initializeEmptyTable();
339337
// reload the timeline from metaClient and validate that its empty table. If there are any instants found, then we should fail the pipeline, bcoz hoodie.properties got deleted by mistake.
340-
HoodieTableMetaClient metaClientToValidate = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).build();
338+
HoodieTableMetaClient metaClientToValidate = HoodieTableMetaClient.builder().setConf(conf).setBasePath(cfg.targetBasePath).build();
341339
if (metaClientToValidate.reloadActiveTimeline().countInstants() > 0) {
342340
// Deleting the recreated hoodie.properties and throwing exception.
343341
fs.delete(new Path(String.format("%s%s/%s", basePathWithForwardSlash, HoodieTableMetaClient.METAFOLDER_NAME, HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
@@ -395,7 +393,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
395393
// Refresh Timeline
396394
refreshTimeline();
397395
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
398-
.setConf(new Configuration(fs.getConf()))
396+
.setConf(conf)
399397
.setBasePath(cfg.targetBasePath)
400398
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(), HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
401399
.setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(props).withPath(cfg.targetBasePath).build())
@@ -432,15 +430,15 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
432430
}
433431

434432
// complete the pending compaction before writing to sink
435-
if (cfg.retryLastPendingInlineCompactionJob && getHoodieClientConfig(this.schemaProvider).inlineCompactionEnabled()) {
433+
if (cfg.retryLastPendingInlineCompactionJob && writeClient.getConfig().inlineCompactionEnabled()) {
436434
Option<String> pendingCompactionInstant = getLastPendingCompactionInstant(allCommitsTimelineOpt);
437435
if (pendingCompactionInstant.isPresent()) {
438436
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = writeClient.compact(pendingCompactionInstant.get());
439437
writeClient.commitCompaction(pendingCompactionInstant.get(), writeMetadata.getCommitMetadata().get(), Option.empty());
440438
refreshTimeline();
441439
reInitWriteClient(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema(), null);
442440
}
443-
} else if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
441+
} else if (cfg.retryLastPendingInlineClusteringJob && writeClient.getConfig().inlineClusteringEnabled()) {
444442
// complete the pending clustering before writing to sink
445443
Option<String> pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt);
446444
if (pendingClusteringInstant.isPresent()) {
@@ -1001,7 +999,7 @@ public void runMetaSync() {
1001999
* this constraint.
10021000
*/
10031001
private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt) throws IOException {
1004-
if ((null != schemaProvider)) {
1002+
if (null != schemaProvider) {
10051003
Schema sourceSchema = schemaProvider.getSourceSchema();
10061004
Schema targetSchema = schemaProvider.getTargetSchema();
10071005
reInitWriteClient(sourceSchema, targetSchema, recordsOpt);
@@ -1013,8 +1011,9 @@ private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option<
10131011
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
10141012
targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(props));
10151013
}
1016-
registerAvroSchemas(sourceSchema, targetSchema);
1017-
final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema);
1014+
final Pair<HoodieWriteConfig, Schema> initialWriteConfigAndSchema = getHoodieClientConfigAndWriterSchema(targetSchema, true);
1015+
final HoodieWriteConfig initialWriteConfig = initialWriteConfigAndSchema.getLeft();
1016+
registerAvroSchemas(sourceSchema, initialWriteConfigAndSchema.getRight());
10181017
final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
10191018
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig)
10201019
.orElse(initialWriteConfig);
@@ -1036,20 +1035,21 @@ private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option<
10361035
}
10371036

10381037
/**
1039-
* Helper to construct Write Client config.
1040-
*
1041-
* @param schemaProvider Schema Provider
1038+
* Helper to construct Write Client config without a schema.
10421039
*/
1043-
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
1044-
return getHoodieClientConfig(schemaProvider != null ? schemaProvider.getTargetSchema() : null);
1040+
private HoodieWriteConfig getHoodieClientConfig() {
1041+
return getHoodieClientConfigAndWriterSchema(null, false).getLeft();
10451042
}
10461043

10471044
/**
10481045
* Helper to construct Write Client config.
10491046
*
1050-
* @param schema Schema
1047+
* @param schema initial writer schema. If null or Avro Null type, the schema will be fetched from previous commit metadata for the table.
1048+
* @param requireSchemaInConfig whether the schema should be present in the config. This is an optimization to avoid fetching schema from previous commits if not needed.
1049+
*
1050+
* @return Pair of HoodieWriteConfig and writer schema.
10511051
*/
1052-
private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
1052+
private Pair<HoodieWriteConfig, Schema> getHoodieClientConfigAndWriterSchema(Schema schema, boolean requireSchemaInConfig) {
10531053
final boolean combineBeforeUpsert = true;
10541054
final boolean autoCommit = false;
10551055

@@ -1075,8 +1075,13 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
10751075
.withAutoCommit(autoCommit)
10761076
.withProps(props);
10771077

1078-
if (schema != null) {
1079-
builder.withSchema(getSchemaForWriteConfig(schema).toString());
1078+
// If schema is required in the config, we need to handle the case where the target schema is null and should be fetched from previous commits
1079+
final Schema returnSchema;
1080+
if (requireSchemaInConfig) {
1081+
returnSchema = getSchemaForWriteConfig(schema);
1082+
builder.withSchema(returnSchema.toString());
1083+
} else {
1084+
returnSchema = schema;
10801085
}
10811086

10821087
HoodieWriteConfig config = builder.build();
@@ -1108,30 +1113,28 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
11081113
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes));
11091114
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
11101115
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert));
1111-
return config;
1116+
return Pair.of(config, returnSchema);
11121117
}
11131118

11141119
private Schema getSchemaForWriteConfig(Schema targetSchema) {
11151120
Schema newWriteSchema = targetSchema;
11161121
try {
1117-
if (targetSchema != null) {
1118-
// check if targetSchema is equal to NULL schema
1119-
if (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
1120-
&& SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
1121-
// target schema is null. fetch schema from commit metadata and use it
1122-
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))
1123-
.setBasePath(cfg.targetBasePath)
1124-
.setPayloadClassName(cfg.payloadClassName)
1125-
.build();
1126-
int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
1127-
if (totalCompleted > 0) {
1128-
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
1129-
Option<Schema> tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
1130-
if (tableSchema.isPresent()) {
1131-
newWriteSchema = tableSchema.get();
1132-
} else {
1133-
LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
1134-
}
1122+
// check if targetSchema is equal to NULL schema
1123+
if (targetSchema == null || (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
1124+
&& SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
1125+
// target schema is null. fetch schema from commit metadata and use it
1126+
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(conf)
1127+
.setBasePath(cfg.targetBasePath)
1128+
.setPayloadClassName(cfg.payloadClassName)
1129+
.build();
1130+
int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
1131+
if (totalCompleted > 0) {
1132+
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
1133+
Option<Schema> tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
1134+
if (tableSchema.isPresent()) {
1135+
newWriteSchema = tableSchema.get();
1136+
} else {
1137+
LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
11351138
}
11361139
}
11371140
}

0 commit comments

Comments
 (0)