Skip to content

Commit

Permalink
PARQUET-1684: Do not store default protobuf values as null for proto3 (
Browse files Browse the repository at this point in the history
…apache#702)

Co-authored-by: Priyank Bagrecha <[email protected]>
  • Loading branch information
bagipriyank and Priyank Bagrecha authored Jun 2, 2020
1 parent 1c63c93 commit 958ed6f
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 33 deletions.
7 changes: 7 additions & 0 deletions parquet-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<elephant-bird.version>4.4</elephant-bird.version>
<protobuf.version>3.5.1</protobuf.version>
<truth-proto-extension.version>1.0</truth-proto-extension.version>
</properties>


Expand All @@ -45,6 +46,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth.extensions</groupId>
<artifactId>truth-proto-extension</artifactId>
<version>${truth-proto-extension.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.*;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

import static java.util.Optional.ofNullable;

Expand Down Expand Up @@ -158,9 +154,13 @@ void writeRawValue(Object value) {

/** Used for writing nonrepeated (optional, required) fields*/
void writeField(Object value) {
recordConsumer.startField(fieldName, index);
if (!(this instanceof ProtoWriteSupport.MapWriter)) {
recordConsumer.startField(fieldName, index);
}
writeRawValue(value);
recordConsumer.endField(fieldName, index);
if (!(this instanceof ProtoWriteSupport.MapWriter)) {
recordConsumer.endField(fieldName, index);
}
}
}

Expand Down Expand Up @@ -279,20 +279,36 @@ final void writeField(Object value) {
}

private void writeAllFields(MessageOrBuilder pb) {
//returns changed fields with values. Map is ordered by id.
Map<FieldDescriptor, Object> changedPbFields = pb.getAllFields();
Descriptor messageDescriptor = pb.getDescriptorForType();
Descriptors.FileDescriptor.Syntax syntax = messageDescriptor.getFile().getSyntax();

for (Map.Entry<FieldDescriptor, Object> entry : changedPbFields.entrySet()) {
FieldDescriptor fieldDescriptor = entry.getKey();
if (Descriptors.FileDescriptor.Syntax.PROTO2.equals(syntax)) {
//returns changed fields with values. Map is ordered by id.
Map<FieldDescriptor, Object> changedPbFields = pb.getAllFields();

if(fieldDescriptor.isExtension()) {
// Field index of an extension field might overlap with a base field.
throw new UnsupportedOperationException(
"Cannot convert Protobuf message with extension field(s)");
}
for (Map.Entry<FieldDescriptor, Object> entry : changedPbFields.entrySet()) {
FieldDescriptor fieldDescriptor = entry.getKey();

if(fieldDescriptor.isExtension()) {
// Field index of an extension field might overlap with a base field.
throw new UnsupportedOperationException(
"Cannot convert Protobuf message with extension field(s)");
}

int fieldIndex = fieldDescriptor.getIndex();
fieldWriters[fieldIndex].writeField(entry.getValue());
int fieldIndex = fieldDescriptor.getIndex();
fieldWriters[fieldIndex].writeField(entry.getValue());
}
} else if (Descriptors.FileDescriptor.Syntax.PROTO3.equals(syntax)) {
List<FieldDescriptor> fieldDescriptors = messageDescriptor.getFields();
for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
FieldDescriptor.Type type = fieldDescriptor.getType();
if (!fieldDescriptor.isRepeated() && FieldDescriptor.Type.MESSAGE.equals(type) && !pb.hasField(fieldDescriptor)) {
continue;
}
int fieldIndex = fieldDescriptor.getIndex();
FieldWriter fieldWriter = fieldWriters[fieldIndex];
fieldWriter.writeField(pb.getField(fieldDescriptor));
}
}
}
}
Expand All @@ -311,9 +327,13 @@ final void writeRawValue(Object value) {

@Override
final void writeField(Object value) {
List<?> list = (List<?>) value;
if (list.isEmpty()) {
return;
}

recordConsumer.startField(fieldName, index);
recordConsumer.startGroup();
List<?> list = (List<?>) value;

recordConsumer.startField("list", 0); // This is the wrapper group for the array field
for (Object listEntry: list) {
Expand Down Expand Up @@ -350,8 +370,12 @@ final void writeRawValue(Object value) {

@Override
final void writeField(Object value) {
recordConsumer.startField(fieldName, index);
List<?> list = (List<?>) value;
if (list.isEmpty()) {
return;
}

recordConsumer.startField(fieldName, index);

for (Object listEntry: list) {
fieldWriter.writeRawValue(listEntry);
Expand Down Expand Up @@ -413,10 +437,15 @@ public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {

@Override
final void writeRawValue(Object value) {
Collection<Message> collection = (Collection<Message>) value;
if (collection.isEmpty()) {
return;
}
recordConsumer.startField(fieldName, index);
recordConsumer.startGroup();

recordConsumer.startField("key_value", 0); // This is the wrapper group for the map field
for (Message msg : (Collection<Message>) value) {
for (Message msg : collection) {
recordConsumer.startGroup();

final Descriptor descriptorForType = msg.getDescriptorForType();
Expand All @@ -432,6 +461,7 @@ final void writeRawValue(Object value) {
recordConsumer.endField("key_value", 0);

recordConsumer.endGroup();
recordConsumer.endField(fieldName, index);
}
}

Expand Down
Loading

0 comments on commit 958ed6f

Please sign in to comment.