Skip to content

Commit d964895

Browse files
[HUDI-7713] Enforce ordering of fields during schema reconciliation (apache#11154)
* Enforce ordering of fields during schema reconciliation * update ordering of expected columns * update tests to match new expectations * update test field ordering * set input batch schema * reorder row fields as well * only reorder fields if reconcile is false * fix schema evoution test expectations around field ordering * minor updates * allow for different ordering modes * use position instead of existin field ID * pr feedback, add test, fix visitor * cleanup test * fix assertion * pr feedback
1 parent eb63e1f commit d964895

File tree

11 files changed

+331
-60
lines changed

11 files changed

+331
-60
lines changed

hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java

+17
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
* used to support schema evolution.
3838
*/
3939
public class InternalSchema implements Serializable {
40+
public static final String ARRAY_ELEMENT = "element";
41+
public static final String MAP_KEY = "key";
42+
public static final String MAP_VALUE = "value";
4043

4144
private static final InternalSchema EMPTY_SCHEMA = new InternalSchema(-1L, RecordType.get());
4245

@@ -50,6 +53,7 @@ public class InternalSchema implements Serializable {
5053
private transient Map<Integer, Field> idToField = null;
5154
private transient Map<String, Integer> nameToId = null;
5255
private transient Map<Integer, String> idToName = null;
56+
private transient Map<String, Integer> nameToPosition = null;
5357

5458
public static InternalSchema getEmptyInternalSchema() {
5559
return EMPTY_SCHEMA;
@@ -268,6 +272,19 @@ public int findIdByName(String name) {
268272
return buildNameToId().getOrDefault(name, -1);
269273
}
270274

275+
/**
276+
* Returns the full name of the field and its position in the schema.
277+
* This differs from its ID in cases where new fields are not appended to the end of schemas.
278+
* The output is used when reconciling the order of fields while ingesting.
279+
* @return a mapping from full field name to a position
280+
*/
281+
public Map<String, Integer> getNameToPosition() {
282+
if (nameToPosition == null) {
283+
nameToPosition = InternalSchemaBuilder.getBuilder().buildNameToPosition(record);
284+
}
285+
return nameToPosition;
286+
}
287+
271288
@Override
272289
public String toString() {
273290
return String.format("table {\n%s\n}",

hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchemaBuilder.java

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.internal.schema.visitor.InternalSchemaVisitor;
2222
import org.apache.hudi.internal.schema.visitor.NameToIDVisitor;
23+
import org.apache.hudi.internal.schema.visitor.NameToPositionVisitor;
2324

2425
import java.io.Serializable;
2526
import java.util.ArrayList;
@@ -67,6 +68,10 @@ public Map<String, Integer> buildNameToId(Type type) {
6768
return visit(type, new NameToIDVisitor());
6869
}
6970

71+
Map<String, Integer> buildNameToPosition(Type type) {
72+
return visit(type, new NameToPositionVisitor());
73+
}
74+
7075
/**
7176
* Use to traverse all types in internalSchema with visitor.
7277
*

hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java

+33-21
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import org.apache.avro.Schema;
3232

3333
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.Comparator;
3436
import java.util.Deque;
3537
import java.util.HashMap;
3638
import java.util.Iterator;
3739
import java.util.LinkedList;
3840
import java.util.List;
3941
import java.util.Map;
4042
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.stream.Collectors;
4144

4245
import static org.apache.avro.Schema.Type.UNION;
4346

@@ -117,10 +120,18 @@ public static Schema convert(Type type, String name) {
117120

118121
/** Convert an avro schema into internal type. */
119122
public static Type convertToField(Schema schema) {
120-
return buildTypeFromAvroSchema(schema);
123+
return buildTypeFromAvroSchema(schema, Collections.emptyMap());
124+
}
125+
126+
private static Type convertToField(Schema schema, Map<String, Integer> existingFieldNameToPositionMapping) {
127+
return buildTypeFromAvroSchema(schema, existingFieldNameToPositionMapping);
121128
}
122129

123130
/** Convert an avro schema into internalSchema. */
131+
public static InternalSchema convert(Schema schema, Map<String, Integer> existingFieldNameToPositionMapping) {
132+
return new InternalSchema((Types.RecordType) convertToField(schema, existingFieldNameToPositionMapping));
133+
}
134+
124135
public static InternalSchema convert(Schema schema) {
125136
return new InternalSchema((Types.RecordType) convertToField(schema));
126137
}
@@ -151,11 +162,11 @@ public static Schema nullableSchema(Schema schema) {
151162
* @param schema a avro schema.
152163
* @return a hudi type.
153164
*/
154-
public static Type buildTypeFromAvroSchema(Schema schema) {
165+
public static Type buildTypeFromAvroSchema(Schema schema, Map<String, Integer> existingNameToPositions) {
155166
// set flag to check this has not been visited.
156-
Deque<String> visited = new LinkedList();
157-
AtomicInteger nextId = new AtomicInteger(1);
158-
return visitAvroSchemaToBuildType(schema, visited, true, nextId);
167+
Deque<String> visited = new LinkedList<>();
168+
AtomicInteger nextId = new AtomicInteger(0);
169+
return visitAvroSchemaToBuildType(schema, visited, "", nextId, existingNameToPositions);
159170
}
160171

161172
private static void checkNullType(Type fieldType, String fieldName, Deque<String> visited) {
@@ -172,11 +183,11 @@ private static void checkNullType(Type fieldType, String fieldName, Deque<String
172183
throw new HoodieNullSchemaTypeException(sb.toString());
173184
} else if (fieldType.typeId() == Type.TypeID.ARRAY) {
174185
visited.push(fieldName);
175-
checkNullType(((Types.ArrayType) fieldType).elementType(), "element", visited);
186+
checkNullType(((Types.ArrayType) fieldType).elementType(), InternalSchema.ARRAY_ELEMENT, visited);
176187
visited.pop();
177188
} else if (fieldType.typeId() == Type.TypeID.MAP) {
178189
visited.push(fieldName);
179-
checkNullType(((Types.MapType) fieldType).valueType(), "value", visited);
190+
checkNullType(((Types.MapType) fieldType).valueType(), InternalSchema.MAP_VALUE, visited);
180191
visited.pop();
181192
}
182193
}
@@ -186,28 +197,27 @@ private static void checkNullType(Type fieldType, String fieldName, Deque<String
186197
*
187198
* @param schema a avro schema.
188199
* @param visited track the visit node when do traversal for avro schema; used to check if the name of avro record schema is correct.
189-
* @param firstVisitRoot track whether the current visited schema node is a root node.
200+
* @param currentFieldPath the dot-separated path to the current field; empty at the root and always ends in a '.' otherwise for ease of concatenation.
190201
* @param nextId an initial id which used to create id for all fields.
191202
* @return a hudi type match avro schema.
192203
*/
193-
private static Type visitAvroSchemaToBuildType(Schema schema, Deque<String> visited, Boolean firstVisitRoot, AtomicInteger nextId) {
204+
private static Type visitAvroSchemaToBuildType(Schema schema, Deque<String> visited, String currentFieldPath, AtomicInteger nextId, Map<String, Integer> existingNameToPosition) {
194205
switch (schema.getType()) {
195206
case RECORD:
196207
String name = schema.getFullName();
197208
if (visited.contains(name)) {
198209
throw new HoodieSchemaException(String.format("cannot convert recursive avro record %s", name));
199210
}
200211
visited.push(name);
201-
List<Schema.Field> fields = schema.getFields();
212+
List<Schema.Field> fields = existingNameToPosition.isEmpty() ? schema.getFields() :
213+
schema.getFields().stream()
214+
.sorted(Comparator.comparing(field -> existingNameToPosition.getOrDefault(currentFieldPath + field.name(), Integer.MAX_VALUE)))
215+
.collect(Collectors.toList());
202216
List<Type> fieldTypes = new ArrayList<>(fields.size());
203217
int nextAssignId = nextId.get();
204-
// when first visit root record, set nextAssignId = 0;
205-
if (firstVisitRoot) {
206-
nextAssignId = 0;
207-
}
208218
nextId.set(nextAssignId + fields.size());
209-
fields.stream().forEach(field -> {
210-
Type fieldType = visitAvroSchemaToBuildType(field.schema(), visited, false, nextId);
219+
fields.forEach(field -> {
220+
Type fieldType = visitAvroSchemaToBuildType(field.schema(), visited, currentFieldPath + field.name() + ".", nextId, existingNameToPosition);
211221
checkNullType(fieldType, field.name(), visited);
212222
fieldTypes.add(fieldType);
213223
});
@@ -226,22 +236,24 @@ private static Type visitAvroSchemaToBuildType(Schema schema, Deque<String> visi
226236
// them up into namespace/struct-name pair)
227237
return Types.RecordType.get(internalFields, schema.getFullName());
228238
case UNION:
229-
List<Type> fTypes = new ArrayList<>();
230-
schema.getTypes().stream().forEach(t -> {
231-
fTypes.add(visitAvroSchemaToBuildType(t, visited, false, nextId));
239+
List<Type> fTypes = new ArrayList<>(2);
240+
schema.getTypes().forEach(t -> {
241+
fTypes.add(visitAvroSchemaToBuildType(t, visited, currentFieldPath, nextId, existingNameToPosition));
232242
});
233243
return fTypes.get(0) == null ? fTypes.get(1) : fTypes.get(0);
234244
case ARRAY:
245+
String elementPath = currentFieldPath + InternalSchema.ARRAY_ELEMENT + ".";
235246
Schema elementSchema = schema.getElementType();
236247
int elementId = nextId.get();
237248
nextId.set(elementId + 1);
238-
Type elementType = visitAvroSchemaToBuildType(elementSchema, visited, false, nextId);
249+
Type elementType = visitAvroSchemaToBuildType(elementSchema, visited, elementPath, nextId, existingNameToPosition);
239250
return Types.ArrayType.get(elementId, AvroInternalSchemaConverter.isOptional(schema.getElementType()), elementType);
240251
case MAP:
241252
int keyId = nextId.get();
242253
int valueId = keyId + 1;
243254
nextId.set(valueId + 1);
244-
Type valueType = visitAvroSchemaToBuildType(schema.getValueType(), visited, false, nextId);
255+
String valuePath = currentFieldPath + InternalSchema.MAP_VALUE + ".";
256+
Type valueType = visitAvroSchemaToBuildType(schema.getValueType(), visited, valuePath, nextId, existingNameToPosition);
245257
return Types.MapType.get(keyId, valueId, Types.StringType.get(), valueType, AvroInternalSchemaConverter.isOptional(schema.getValueType()));
246258
default:
247259
return visitAvroPrimitiveToBuildInternalType(schema);

hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.avro.Schema;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.TreeMap;
2930
import java.util.stream.Collectors;
@@ -58,7 +59,7 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche
5859
if (incomingSchema.getType() == Schema.Type.NULL) {
5960
return oldTableSchema;
6061
}
61-
InternalSchema inComingInternalSchema = convert(incomingSchema);
62+
InternalSchema inComingInternalSchema = convert(incomingSchema, oldTableSchema.getNameToPosition());
6263
// check column add/missing
6364
List<String> colNamesFromIncoming = inComingInternalSchema.getAllColsFullName();
6465
List<String> colNamesFromOldSchema = oldTableSchema.getAllColsFullName();
@@ -136,7 +137,7 @@ public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchem
136137
* @param targetSchema target schema that source schema will be reconciled against
137138
* @return schema (based off {@code source} one) that has nullability constraints and datatypes reconciled
138139
*/
139-
public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema) {
140+
public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema, boolean shouldReorderColumns) {
140141
if (targetSchema.getType() == Schema.Type.NULL || targetSchema.getFields().isEmpty()) {
141142
return sourceSchema;
142143
}
@@ -145,8 +146,9 @@ public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema tar
145146
return targetSchema;
146147
}
147148

148-
InternalSchema sourceInternalSchema = convert(sourceSchema);
149149
InternalSchema targetInternalSchema = convert(targetSchema);
150+
// Use existing fieldIds for consistent field ordering between commits when shouldReorderColumns is true
151+
InternalSchema sourceInternalSchema = convert(sourceSchema, shouldReorderColumns ? targetInternalSchema.getNameToPosition() : Collections.emptyMap());
150152

151153
List<String> colNamesSourceSchema = sourceInternalSchema.getAllColsFullName();
152154
List<String> colNamesTargetSchema = targetInternalSchema.getAllColsFullName();

hudi-common/src/main/java/org/apache/hudi/internal/schema/visitor/NameToIDVisitor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ public Map<String, Integer> field(Types.Field field, Map<String, Integer> fieldR
9595

9696
@Override
9797
public Map<String, Integer> array(Types.ArrayType array, Map<String, Integer> elementResult) {
98-
nameToId.put(createFullName("element", fieldNames), array.elementId());
98+
nameToId.put(createFullName(InternalSchema.ARRAY_ELEMENT, fieldNames), array.elementId());
9999
return nameToId;
100100
}
101101

102102
@Override
103103
public Map<String, Integer> map(Types.MapType map, Map<String, Integer> keyResult, Map<String, Integer> valueResult) {
104-
nameToId.put(createFullName("key", fieldNames), map.keyId());
105-
nameToId.put(createFullName("value", fieldNames), map.valueId());
104+
nameToId.put(createFullName(InternalSchema.MAP_KEY, fieldNames), map.keyId());
105+
nameToId.put(createFullName(InternalSchema.MAP_VALUE, fieldNames), map.valueId());
106106
return nameToId;
107107
}
108108

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.internal.schema.visitor;
20+
21+
import org.apache.hudi.internal.schema.InternalSchema;
22+
import org.apache.hudi.internal.schema.Type;
23+
import org.apache.hudi.internal.schema.Types;
24+
25+
import java.util.Deque;
26+
import java.util.HashMap;
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
32+
import static org.apache.hudi.internal.schema.utils.InternalSchemaUtils.createFullName;
33+
34+
/**
35+
* Schema visitor to produce name -> position map for internalSchema. Positions are assigned in a depth-first manner.
36+
*/
37+
public class NameToPositionVisitor extends InternalSchemaVisitor<Map<String, Integer>> {
38+
private final Deque<String> fieldNames = new LinkedList<>();
39+
private final Map<String, Integer> nameToId = new HashMap<>();
40+
private final AtomicInteger position = new AtomicInteger(0);
41+
42+
@Override
43+
public void beforeField(Types.Field field) {
44+
nameToId.put(createFullName(field.name(), fieldNames), position.getAndIncrement());
45+
fieldNames.push(field.name());
46+
}
47+
48+
@Override
49+
public void afterField(Types.Field field) {
50+
fieldNames.pop();
51+
}
52+
53+
@Override
54+
public void beforeArrayElement(Types.Field elementField) {
55+
nameToId.put(createFullName(InternalSchema.ARRAY_ELEMENT, fieldNames), position.getAndIncrement());
56+
fieldNames.push(elementField.name());
57+
}
58+
59+
@Override
60+
public void afterArrayElement(Types.Field elementField) {
61+
fieldNames.pop();
62+
}
63+
64+
@Override
65+
public void beforeMapKey(Types.Field keyField) {
66+
nameToId.put(createFullName(InternalSchema.MAP_KEY, fieldNames), position.getAndIncrement());
67+
fieldNames.push(keyField.name());
68+
}
69+
70+
@Override
71+
public void afterMapKey(Types.Field keyField) {
72+
fieldNames.pop();
73+
}
74+
75+
@Override
76+
public void beforeMapValue(Types.Field valueField) {
77+
nameToId.put(createFullName(InternalSchema.MAP_VALUE, fieldNames), position.getAndIncrement());
78+
fieldNames.push(valueField.name());
79+
}
80+
81+
@Override
82+
public void afterMapValue(Types.Field valueField) {
83+
fieldNames.pop();
84+
}
85+
86+
@Override
87+
public Map<String, Integer> schema(InternalSchema schema, Map<String, Integer> recordResult) {
88+
return nameToId;
89+
}
90+
91+
@Override
92+
public Map<String, Integer> record(Types.RecordType record, List<Map<String, Integer>> fieldResults) {
93+
return nameToId;
94+
}
95+
96+
@Override
97+
public Map<String, Integer> field(Types.Field field, Map<String, Integer> fieldResult) {
98+
return nameToId;
99+
}
100+
101+
@Override
102+
public Map<String, Integer> array(Types.ArrayType array, Map<String, Integer> elementResult) {
103+
return nameToId;
104+
}
105+
106+
@Override
107+
public Map<String, Integer> map(Types.MapType map, Map<String, Integer> keyResult, Map<String, Integer> valueResult) {
108+
return nameToId;
109+
}
110+
111+
@Override
112+
public Map<String, Integer> primitive(Type.PrimitiveType primitive) {
113+
return nameToId;
114+
}
115+
}

hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ private Schema create(String name, Schema.Field... fields) {
152152

153153
@Test
154154
public void testArrayType() {
155-
Type arrayNestRecordType = Types.ArrayType.get(1, false,
156-
Types.RecordType.get(Arrays.asList(Types.Field.get(2, false, "a", Types.FloatType.get()),
157-
Types.Field.get(3, false, "b", Types.FloatType.get()))));
155+
Type arrayNestRecordType = Types.ArrayType.get(0, false,
156+
Types.RecordType.get(Arrays.asList(Types.Field.get(1, false, "a", Types.FloatType.get()),
157+
Types.Field.get(2, false, "b", Types.FloatType.get()))));
158158

159159
Schema schema = SchemaBuilder.array().items(create("t1",
160160
new Schema.Field("a", Schema.create(Schema.Type.FLOAT), null, null),
@@ -526,15 +526,16 @@ public void testReconcileSchema() {
526526
new Schema.Field("d2", AvroInternalSchemaConverter.nullableSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), null, JsonProperties.NULL_VALUE));
527527

528528
Schema simpleCheckSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"simple\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
529-
+ "{\"name\":\"b\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"a1\",\"type\":[\"null\",\"long\"],\"default\":null},"
529+
+ "{\"name\":\"b\",\"type\":[\"null\",\"int\"],\"default\":null},"
530530
+ "{\"name\":\"c\",\"type\":[\"null\",\"long\"],\"default\":null},"
531-
+ "{\"name\":\"c1\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"c2\",\"type\":[\"null\",\"long\"],\"default\":null},"
532531
+ "{\"name\":\"d\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
532+
+ "{\"name\":\"a1\",\"type\":[\"null\",\"long\"],\"default\":null},"
533+
+ "{\"name\":\"c1\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"c2\",\"type\":[\"null\",\"long\"],\"default\":null},"
533534
+ "{\"name\":\"d1\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
534535
+ "{\"name\":\"d2\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}");
535536

536537
Schema simpleReconcileSchema = AvroInternalSchemaConverter.convert(AvroSchemaEvolutionUtils
537538
.reconcileSchema(incomingSchema, AvroInternalSchemaConverter.convert(schema)), "schemaNameFallback");
538-
Assertions.assertEquals(simpleReconcileSchema, simpleCheckSchema);
539+
Assertions.assertEquals(simpleCheckSchema, simpleReconcileSchema);
539540
}
540541
}

0 commit comments

Comments
 (0)