Skip to content

Commit

Permalink
Refactor and clean up json avro schema converter (airbytehq#9363)
Browse files Browse the repository at this point in the history
* Default isRootNode to true and rename parameters

* Default appendAirbyteFields to true

* Rename methods and add javadoc

* Simplify namespace generation and delete obtainPaths method

* Simplify test case

* Add namespace for objects in array

* Merge object definitions in type union (need improvement)

* Fix merging of record fields

* Switch test case to minimize changes

* Mark assembler and builder as final variables

* Adjust test case

* Remove unused code

* Keep field doc

* Update comment

* Merge record schemas recursively

* Add nested test case

* Add docuementation

* Add test case for issue airbytehq#5564

* Format code
  • Loading branch information
tuliren authored Jan 9, 2022
1 parent c071d73 commit 22ef236
Show file tree
Hide file tree
Showing 7 changed files with 719 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public S3Writer create(final GcsDestinationConfig config,
return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, AvroConstants.JSON_CONVERTER, stream.getJsonSchema());
} else {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace());

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));
return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@
package io.airbyte.integrations.destination.s3.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Helper methods for unit tests. This is needed by multiple modules, so it is in the src directory.
Expand All @@ -23,7 +18,7 @@ public class AvroRecordHelper {

public static JsonFieldNameUpdater getFieldNameUpdater(final String streamName, final String namespace, final JsonNode streamSchema) {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true, true);
schemaConverter.getAvroSchema(streamSchema, streamName, namespace);
return new JsonFieldNameUpdater(schemaConverter.getStandardizedNames());
}

Expand Down Expand Up @@ -52,32 +47,4 @@ public static JsonNode pruneAirbyteJson(final JsonNode input) {
return output;
}

public static void obtainPaths(String currentPath, JsonNode jsonNode, Map<JsonNode, String> jsonNodePathMap) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
String pathPrefix = currentPath.isEmpty() ? "" : currentPath + "/";
String[] pathFieldsArray = currentPath.split("/");
String parent = Arrays.stream(pathFieldsArray)
.filter(x -> !x.equals("items"))
.filter(x -> !x.equals("properties"))
.filter(x -> !x.equals(pathFieldsArray[pathFieldsArray.length - 1]))
.collect(Collectors.joining("."));
if (!parent.isEmpty()) {
jsonNodePathMap.put(jsonNode, parent);
}
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
obtainPaths(pathPrefix + entry.getKey(), entry.getValue(), jsonNodePathMap);
}
} else if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;

for (int i = 0; i < arrayNode.size(); i++) {
String arrayPath = currentPath + "/" + i;
obtainPaths(arrayPath, arrayNode.get(i), jsonNodePathMap);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public S3Writer create(final S3DestinationConfig config,
LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema());

final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace());

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
public void testFieldTypeConversion(final String fieldName, final JsonNode jsonFieldSchema, final JsonNode avroFieldType) {
assertEquals(
avroFieldType,
Jsons.deserialize(SCHEMA_CONVERTER.getNullableFieldTypes(fieldName, jsonFieldSchema, true, true).toString()),
Jsons.deserialize(SCHEMA_CONVERTER.parseJsonField(fieldName, null, jsonFieldSchema, true, true).toString()),
String.format("Test for %s failed", fieldName));
}

Expand Down Expand Up @@ -111,7 +111,7 @@ public void testJsonAvroConversion(final String schemaName,
final JsonNode avroSchema,
final JsonNode avroObject)
throws Exception {
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields, true);
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields, true, true, true);
assertEquals(
avroSchema,
Jsons.deserialize(actualAvroSchema.toString()),
Expand Down
Loading

0 comments on commit 22ef236

Please sign in to comment.