Skip to content

Commit

Permalink
MINOR: Fix common struct JsonConverter and Schema generation (apa…
Browse files Browse the repository at this point in the history
…che#9279)

This patch fixes a couple problems with the use of the `StructRegistry`. First, it fixes registration so that it is consistently based on the typename of the struct. Previously structs were registered under the field name which meant that fields which referred to common structs resulted in multiple entries. Second, the patch fixes `SchemaGenerator` so that common structs are considered first.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
hachikuji authored Sep 17, 2020
1 parent 1443f24 commit 97d1a32
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.message;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
Expand Down Expand Up @@ -298,8 +299,7 @@ public void testMyStructUnsupportedVersion() {
SimpleExampleMessageData.MyStruct myStruct =
new SimpleExampleMessageData.MyStruct().setStructId(10);
// Check serialization throws exception for unsupported version
testRoundTrip(new SimpleExampleMessageData().setMyStruct(myStruct),
__ -> { }, (short) 1);
testRoundTrip(new SimpleExampleMessageData().setMyStruct(myStruct), (short) 1);
}

/**
Expand All @@ -322,6 +322,18 @@ public void testMyTaggedStruct() {
message -> assertEquals("abc", message.myString()), (short) 2);
}

@Test
public void testCommonStruct() {
SimpleExampleMessageData message = new SimpleExampleMessageData();
message.setMyCommonStruct(new SimpleExampleMessageData.TestCommonStruct()
.setFoo(1)
.setBar(2));
message.setMyOtherCommonStruct(new SimpleExampleMessageData.TestCommonStruct()
.setFoo(3)
.setBar(4));
testRoundTrip(message, (short) 2);
}

private ByteBuffer serialize(SimpleExampleMessageData message, short version) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int size = message.size(cache, version);
Expand Down Expand Up @@ -354,6 +366,10 @@ private SimpleExampleMessageData deserializeThroughStruct(ByteBuffer buf, short
return new SimpleExampleMessageData(struct, version);
}

private void testRoundTrip(SimpleExampleMessageData message, short version) {
testRoundTrip(message, m -> { }, version);
}

private void testRoundTrip(SimpleExampleMessageData message,
Consumer<SimpleExampleMessageData> validator) {
testRoundTrip(message, validator, (short) 1);
Expand All @@ -376,5 +392,13 @@ private void testRoundTrip(SimpleExampleMessageData message,
validator.accept(messageFromStruct);
assertEquals(message, messageFromStruct);
assertEquals(message.hashCode(), messageFromStruct.hashCode());

// Check JSON serialization
JsonNode serializedJson = SimpleExampleMessageDataJsonConverter.write(message, version);
SimpleExampleMessageData messageFromJson = SimpleExampleMessageDataJsonConverter.read(serializedJson, version);
validator.accept(messageFromJson);
assertEquals(message, messageFromJson);
assertEquals(message.hashCode(), messageFromJson.hashCode());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@
"taggedVersions": "2+", "tag": 8,
"fields": [
{ "name": "structId", "type": "string", "versions": "2+", "about": "String field in struct"}
]},
{ "name": "myCommonStruct", "type": "TestCommonStruct", "versions": "0+"},
{ "name": "myOtherCommonStruct", "type": "TestCommonStruct", "versions": "0+"}
],
"commonStructs": [
{ "name": "TestCommonStruct", "versions": "0+", "fields": [
{ "name": "foo", "type": "int32", "default": "123", "versions": "0+" },
{ "name": "bar", "type": "int32", "default": "123", "versions": "0+" }
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ public boolean isStruct() {
return true;
}

public String typeName() {
return type;
}

@Override
public String toString() {
return type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ private void generateSubclasses(String className, StructSpec struct,
parentVersions.intersect(struct.versions()));
}
} else if (field.type().isStruct()) {
if (!structRegistry.commonStructNames().contains(field.name())) {
if (!structRegistry.commonStructNames().contains(field.typeString())) {
generateClass(Optional.empty(),
field.type().toString(),
field.typeString(),
structRegistry.findStruct(field),
parentVersions.intersect(struct.versions()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ static class MessageInfo {

void generateSchemas(MessageSpec message) throws Exception {
this.messageFlexibleVersions = message.flexibleVersions();
// Generate schemas for inline structures
generateSchemas(message.dataClassName(), message.struct(),
message.struct().versions());

// Generate schemas for common structures
// First generate schemas for common structures so that they are
// available when we generate the inline structures
for (Iterator<StructSpec> iter = structRegistry.commonStructs(); iter.hasNext(); ) {
StructSpec struct = iter.next();
generateSchemas(struct.name(), struct, struct.versions());
generateSchemas(struct.name(), struct, message.struct().versions());
}

// Generate schemas for inline structures
generateSchemas(message.dataClassName(), message.struct(),
message.struct().versions());
}

void generateSchemas(String className, StructSpec struct,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,34 @@ void register(MessageSpec message) throws Exception {
@SuppressWarnings("unchecked")
private void addStructSpecs(Versions parentVersions, List<FieldSpec> fields) {
for (FieldSpec field : fields) {
String elementName = null;
String typeName = null;
if (field.type().isStructArray()) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
elementName = arrayType.elementName();
typeName = arrayType.elementName();
} else if (field.type().isStruct()) {
elementName = field.name();
typeName = field.typeString();
FieldType.StructType structType = (FieldType.StructType) field.type();
typeName = structType.typeName();
}
if (elementName != null) {
if (commonStructNames.contains(elementName)) {
if (typeName != null) {
if (commonStructNames.contains(typeName)) {
// If we're using a common structure, we can't specify its fields.
// The fields should be specified in the commonStructs area.
if (!field.fields().isEmpty()) {
throw new RuntimeException("Can't re-specify the common struct " +
elementName + " as an inline struct.");
typeName + " as an inline struct.");
}
} else if (structs.containsKey(elementName)) {
} else if (structs.containsKey(typeName)) {
// Inline structures should only appear once.
throw new RuntimeException("Struct " + elementName +
throw new RuntimeException("Struct " + typeName +
" was specified twice.");
} else {
// Synthesize a StructSpec object out of the fields.
StructSpec spec = new StructSpec(typeName,
field.versions().toString(),
field.fields());
structs.put(elementName, new StructInfo(spec, parentVersions));
field.versions().toString(),
field.fields());
structs.put(typeName, new StructInfo(spec, parentVersions));
}

addStructSpecs(parentVersions.intersect(field.versions()), field.fields());
}
}
Expand All @@ -130,7 +129,8 @@ StructSpec findStruct(FieldSpec field) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
structFieldName = arrayType.elementName();
} else if (field.type().isStruct()) {
structFieldName = field.name();
FieldType.StructType structType = (FieldType.StructType) field.type();
structFieldName = structType.typeName();
} else {
throw new RuntimeException("Field " + field.name() +
" cannot be treated as a structure.");
Expand Down

0 comments on commit 97d1a32

Please sign in to comment.