Skip to content

Commit d41541c

Browse files
[MINOR] Handle cases of malformed records when converting to json (apache#10943)
1 parent bac6ea7 commit d41541c

File tree

6 files changed

+70
-12
lines changed

6 files changed

+70
-12
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
223223
val transform: GenericRecord => Either[GenericRecord, String] = record => try {
224224
Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
225225
} catch {
226-
case _: Throwable => Right(HoodieAvroUtils.avroToJsonString(record, false))
226+
case _: Throwable => Right(HoodieAvroUtils.safeAvroToJsonString(record))
227227
}
228228
recs.map(transform)
229229
}

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

+14
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,20 @@ public static String avroToJsonString(GenericRecord record, boolean pretty) thro
197197
return avroToJsonHelper(record, pretty).toString();
198198
}
199199

200+
/**
201+
* Convert a given avro record to a JSON string. If the record contents are invalid, return the record.toString().
202+
* Use this method over {@link HoodieAvroUtils#avroToJsonString} when simply trying to print the record contents without any guarantees around their correctness.
203+
* @param record The GenericRecord to convert
204+
* @return a JSON string
205+
*/
206+
public static String safeAvroToJsonString(GenericRecord record) {
207+
try {
208+
return avroToJsonString(record, false);
209+
} catch (Exception e) {
210+
return record.toString();
211+
}
212+
}
213+
200214
/**
201215
* Convert a given avro record to json and return the encoded bytes.
202216
*

hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java

+23
Original file line numberDiff line numberDiff line change
@@ -629,4 +629,27 @@ public void testAddMetadataFields() {
629629
assertEquals("custom_schema_property_value", schemaWithMetadata.getProp("custom_schema_property"));
630630
assertEquals("value", originalFieldsInUpdatedSchema.get(0).getProp("custom_field_property"));
631631
}
632+
633+
@Test
634+
void testSafeAvroToJsonStringMissingRequiredField() {
635+
Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
636+
GenericRecord record = new GenericData.Record(schema);
637+
record.put("non_pii_col", "val1");
638+
record.put("pii_col", "val2");
639+
record.put("timestamp", 3.5);
640+
String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
641+
assertEquals("{\"timestamp\": 3.5, \"_row_key\": null, \"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
642+
}
643+
644+
@Test
645+
void testSafeAvroToJsonStringBadDataType() {
646+
Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
647+
GenericRecord record = new GenericData.Record(schema);
648+
record.put("non_pii_col", "val1");
649+
record.put("_row_key", "key");
650+
record.put("pii_col", "val2");
651+
record.put("timestamp", "foo");
652+
String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
653+
assertEquals("{\"timestamp\": \"foo\", \"_row_key\": \"key\", \"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
654+
}
632655
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java

+19
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.hudi.utilities.streamer;
2121

22+
import java.util.Objects;
23+
2224
/**
2325
* Error event is an event triggered during write or processing failure of a record.
2426
*/
@@ -40,6 +42,23 @@ public ErrorReason getReason() {
4042
return reason;
4143
}
4244

45+
@Override
46+
public boolean equals(Object o) {
47+
if (this == o) {
48+
return true;
49+
}
50+
if (o == null || getClass() != o.getClass()) {
51+
return false;
52+
}
53+
ErrorEvent<?> that = (ErrorEvent<?>) o;
54+
return reason == that.reason && Objects.equals(payload, that.payload);
55+
}
56+
57+
@Override
58+
public int hashCode() {
59+
return Objects.hash(reason, payload);
60+
}
61+
4362
/**
4463
* The reason behind write or processing failure of a record
4564
*/

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private static Either<HoodieRecord, String> generateErrorRecordOrThrowException(
167167
}
168168
}
169169
try {
170-
return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
170+
return Either.right(HoodieAvroUtils.safeAvroToJsonString(genRec));
171171
} catch (Exception ex) {
172172
throw new HoodieException("Failed to convert illegal record to json", ex);
173173
}

hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@
2929
import org.apache.avro.Schema;
3030
import org.apache.avro.generic.GenericData;
3131
import org.apache.avro.generic.GenericRecord;
32-
import org.apache.spark.SparkException;
3332
import org.apache.spark.api.java.JavaRDD;
3433
import org.junit.jupiter.api.BeforeAll;
3534
import org.junit.jupiter.params.ParameterizedTest;
3635
import org.junit.jupiter.params.provider.EnumSource;
36+
import org.mockito.ArgumentCaptor;
3737
import org.mockito.Mockito;
3838

3939
import java.util.Collections;
40+
import java.util.List;
4041

41-
import static org.junit.jupiter.api.Assertions.assertThrows;
42-
import static org.junit.jupiter.api.Assertions.assertTrue;
42+
import static org.junit.jupiter.api.Assertions.assertEquals;
43+
import static org.mockito.Mockito.doNothing;
4344

4445
/**
4546
* Tests {@link HoodieStreamerUtils}.
@@ -73,12 +74,13 @@ public void testCreateHoodieRecordsWithError(HoodieRecordType recordType) {
7374
TypedProperties props = new TypedProperties();
7475
SchemaProvider schemaProvider = new SimpleSchemaProvider(jsc, schema, props);
7576
BaseErrorTableWriter errorTableWriter = Mockito.mock(BaseErrorTableWriter.class);
76-
SparkException exception = assertThrows(
77-
SparkException.class,
78-
() -> HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
79-
schemaProvider, recordType, false, "000", Option.of(errorTableWriter))
80-
.get().collect()
81-
);
82-
assertTrue(exception.getMessage().contains("Failed to convert illegal record to json"));
77+
ArgumentCaptor<JavaRDD<?>> errorEventCaptor = ArgumentCaptor.forClass(JavaRDD.class);
78+
doNothing().when(errorTableWriter).addErrorEvents(errorEventCaptor.capture());
79+
HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
80+
schemaProvider, recordType, false, "000", Option.of(errorTableWriter));
81+
List<ErrorEvent<String>> actualErrorEvents = (List<ErrorEvent<String>>) errorEventCaptor.getValue().collect();
82+
ErrorEvent<String> expectedErrorEvent = new ErrorEvent<>("{\"timestamp\": 1000, \"_row_key\": \"key1\", \"partition_path\": \"path1\", \"rider\": null, \"driver\": \"driver\"}",
83+
ErrorEvent.ErrorReason.RECORD_CREATION);
84+
assertEquals(Collections.singletonList(expectedErrorEvent), actualErrorEvents);
8385
}
8486
}

0 commit comments

Comments
 (0)