Skip to content

Commit

Permalink
PARQUET-363: Allow empty schema groups.
Browse files Browse the repository at this point in the history
This removes the check added in PARQUET-278 that rejects schema groups
that have no fields. Selecting 0 columns from a file is allowed and used
by Hive and SparkSQL to implement queries like `select count(1) ...`

Author: Ryan Blue <[email protected]>

Closes apache#263 from rdblue/PARQUET-363-allow-empty-groups and squashes the following commits:

ab370f1 [Ryan Blue] PARQUET-363: Update Type builder tests to allow empty groups.
926932b [Ryan Blue] PARQUET-363: Add write-side schema validation.
365f30d [Ryan Blue] PARQUET-363: Allow empty schema groups.
  • Loading branch information
rdblue committed Sep 11, 2015
1 parent 9962a0f commit f203d80
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public GroupType(Repetition repetition, String name, OriginalType originalType,
*/
GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields, ID id) {
super(name, repetition, originalType, id);
if (fields.isEmpty()) {
throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name);
}
this.fields = fields;
this.indexByName = new HashMap<String, Integer>();
for (int i = 0; i < fields.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.schema;

public class TypeUtil {
public static void checkValidWriteSchema(GroupType schema) {
schema.accept(new TypeVisitor() {
@Override
public void visit(GroupType groupType) {
if (groupType.getFieldCount() <= 0) {
throw new InvalidSchemaException(
"Cannot write a schema with an empty group: " + groupType);
}
for (Type type : groupType.getFields()) {
type.accept(this);
}
}

@Override
public void visit(MessageType messageType) {
visit((GroupType) messageType);
}

@Override
public void visit(PrimitiveType primitiveType) {
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,6 @@ public THIS addFields(Type... types) {

@Override
protected GroupType build(String name) {
Preconditions.checkState(!fields.isEmpty(),
"Cannot build an empty group");
return new GroupType(repetition, name, originalType, fields, id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,4 @@ public void testIDs() throws Exception {
assertEquals(schema.toString(), schema2.toString());
}

@Test
public void testEmptyGroup() {
try {
MessageType t5 = new MessageType("root1",
new GroupType(REQUIRED, "g1"),
new GroupType(REQUIRED, "g2",
new PrimitiveType(OPTIONAL, BINARY, "b")));
fail("should throw InvalidSchemaException when GroupType contains no child");
} catch (InvalidSchemaException e) {
assertEquals("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: g1", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;

Expand Down Expand Up @@ -155,39 +154,20 @@ public void testFixedTypeConstruction() {

@Test
public void testEmptyGroup() {
assertThrows("Should complain that required group is empty",
IllegalStateException.class, new Callable<Type>() {
@Override
public Type call() throws Exception {
return Types.requiredGroup().named("g");
}
});
assertThrows("Should complain that optional group is empty",
IllegalStateException.class, new Callable<Type>() {
@Override
public Type call() throws Exception {
return Types.optionalGroup().named("g");
}
});
assertThrows("Should complain that repeated group is empty",
IllegalStateException.class, new Callable<Type>() {
@Override
public Type call() throws Exception {
return Types.repeatedGroup().named("g");
}
});
// empty groups are allowed to support selecting 0 columns (counting rows)
Assert.assertEquals("Should not complain about an empty required group",
Types.requiredGroup().named("g"), new GroupType(REQUIRED, "g"));
Assert.assertEquals("Should not complain about an empty required group",
Types.optionalGroup().named("g"), new GroupType(OPTIONAL, "g"));
Assert.assertEquals("Should not complain about an empty required group",
Types.repeatedGroup().named("g"), new GroupType(REPEATED, "g"));
}

@Test
@Ignore(value="Enforcing this breaks tests in parquet-thrift")
public void testEmptyMessage() {
assertThrows("Should complain that message is empty",
IllegalStateException.class, new Callable<Type>() {
@Override
public Type call() throws Exception {
return Types.buildMessage().named("m");
}
});
// empty groups are allowed to support selecting 0 columns (counting rows)
Assert.assertEquals("Should not complain about an empty required group",
Types.buildMessage().named("m"), new MessageType("m"));
}

@Test(expected=IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.schema;

import org.junit.Test;

import java.util.concurrent.Callable;

import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;

public class TestTypeUtil {
@Test
public void testWriteCheckMessageType() {
TypeUtil.checkValidWriteSchema(Types.buildMessage()
.required(INT32).named("a")
.optional(BINARY).as(UTF8).named("b")
.named("valid_schema"));

TestTypeBuilders.assertThrows("Should complain about empty MessageType",
InvalidSchemaException.class,
new Callable<Void>() {
@Override
public Void call() throws Exception {
TypeUtil.checkValidWriteSchema(new MessageType("invalid_schema"));
return null;
}
});
}

@Test
public void testWriteCheckGroupType() {
TypeUtil.checkValidWriteSchema(Types.repeatedGroup()
.required(INT32).named("a")
.optional(BINARY).as(UTF8).named("b")
.named("valid_group"));

TestTypeBuilders.assertThrows("Should complain about empty GroupType",
InvalidSchemaException.class,
new Callable<Void>() {
@Override
public Void call() throws Exception {
TypeUtil.checkValidWriteSchema(
new GroupType(REPEATED, "invalid_group"));
return null;
}
});
}

@Test
public void testWriteCheckNestedGroupType() {
TypeUtil.checkValidWriteSchema(Types.buildMessage()
.repeatedGroup()
.required(INT32).named("a")
.optional(BINARY).as(UTF8).named("b")
.named("valid_group")
.named("valid_message"));

TestTypeBuilders.assertThrows("Should complain about empty GroupType",
InvalidSchemaException.class,
new Callable<Void>() {
@Override
public Void call() throws Exception {
TypeUtil.checkValidWriteSchema(Types.buildMessage()
.addField(new GroupType(REPEATED, "invalid_group"))
.named("invalid_message"));
return null;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.TypeUtil;

/**
* Internal implementation of the Parquet file writer as a block container
Expand Down Expand Up @@ -205,6 +206,7 @@ public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode, long rowGroupSize,
int maxPaddingSize)
throws IOException {
TypeUtil.checkValidWriteSchema(schema);
this.schema = schema;
FileSystem fs = file.getFileSystem(configuration);
boolean overwriteFlag = (mode == Mode.OVERWRITE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;

/**
* An example file writer class.
* THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE.
*/
public class ExampleParquetWriter extends ParquetWriter<Group> {

/**
* Creates a Builder for configuring ParquetWriter with the example object
* model. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE.
*
* @param file the output file to create
* @return a {@link Builder} to create a {@link ParquetWriter}
*/
public static Builder builder(Path file) {
return new Builder(file);
}

/**
* Create a new {@link ExampleParquetWriter}.
*
* @param file The file name to write to.
* @param writeSupport The schema to write with.
* @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
* @param blockSize the block size threshold.
* @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
* @param enableDictionary Whether to use a dictionary to compress columns.
* @param conf The Configuration to use.
* @throws IOException
*/
ExampleParquetWriter(Path file, WriteSupport<Group> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize, int pageSize, boolean enableDictionary,
boolean enableValidation,
ParquetProperties.WriterVersion writerVersion,
Configuration conf)
throws IOException {
super(file, writeSupport, compressionCodecName, blockSize, pageSize,
pageSize, enableDictionary, enableValidation, writerVersion, conf);
}

public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;

private Builder(Path file) {
super(file);
}

public Builder withType(MessageType type) {
this.type = type;
return this;
}

@Override
protected Builder self() {
return this;
}

@Override
protected WriteSupport<Group> getWriteSupport(Configuration conf) {
return new GroupWriteSupport(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,22 @@ public static MessageType getSchema(Configuration configuration) {
return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA));
}

private MessageType schema;
private MessageType schema = null;
private GroupWriter groupWriter;

public GroupWriteSupport() {
}

GroupWriteSupport(MessageType schema) {
this.schema = schema;
}

@Override
public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) {
schema = getSchema(configuration);
// if present, prefer the schema passed to the constructor
if (schema == null) {
schema = getSchema(configuration);
}
return new WriteContext(schema, new HashMap<String, String>());
}

Expand Down
Loading

0 comments on commit f203d80

Please sign in to comment.