Skip to content

Commit

Permalink
PARQUET-346: Minor fixes for PARQUET-350, PARQUET-348, PARQUET-346, P…
Browse files Browse the repository at this point in the history
…ARQUET-345

PARQUET-346:
ThriftSchemaConverter throws for unknown struct or union type
This is triggered when passing a StructType that comes from old file metadata

PARQUET-350:
ThriftRecordConverter throws NPE for unrecognized enum values
This is just some better error reporting.

PARQUET-348:
shouldIgnoreStatistics too noisy
This is just a case of way over logging something, to the point that it make the logs unreadable

PARQUET-345
ThriftMetaData toString() should not try to load class reflectively
This is a case where the error reporting itself crashes, which results in the real error message getting lost

Author: Alex Levenson <[email protected]>

Closes apache#252 from isnotinvain/alexlevenson/various-fixes and squashes the following commits:

9b5cb0e [Alex Levenson] Add comments, cleanup some minor use of ThriftSchemaConverter
376343e [Alex Levenson] Fix test
d9d5dad [Alex Levenson] add license headers
e26dc0c [Alex Levenson] Add tests
8d9dde0 [Alex Levenson] Fixes for PARQUET-350, PARQUET-348, PARQUET-346, PARQUET-345
  • Loading branch information
isnotinvain committed Jul 31, 2015
1 parent 454fc36 commit b86f68e
Show file tree
Hide file tree
Showing 16 changed files with 303 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.parquet;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.VersionParser.VersionParseException;
Expand All @@ -31,6 +33,8 @@
* and thus it's statistics should be ignored / not trusted.
*/
public class CorruptStatistics {
private static final AtomicBoolean alreadyLogged = new AtomicBoolean(false);

private static final Log LOG = Log.getLog(CorruptStatistics.class);

// the version in which the bug described by jira: PARQUET-251 was fixed
Expand All @@ -52,7 +56,7 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
if (Strings.isNullOrEmpty(createdBy)) {
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
LOG.info("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
warnOnce("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
return true;
}

Expand All @@ -65,16 +69,16 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
}

if (Strings.isNullOrEmpty(version.version)) {
LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
warnOnce("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
return true;
}

SemanticVersion semver = SemanticVersion.parse(version.version);

if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
LOG.info("Ignoring statistics because this file was created prior to "
warnOnce("Ignoring statistics because this file was created prior to "
+ PARQUET_251_FIXED_VERSION
+ ", see PARQUET-251" );
+ ", see PARQUET-251");
return true;
}

Expand All @@ -83,22 +87,30 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
} catch (RuntimeException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
} catch (SemanticVersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
} catch (VersionParseException e) {
// couldn't parse the created_by field, log what went wrong, don't trust the stats,
// but don't make this fatal.
warnParseError(createdBy, e);
warnParseErrorOnce(createdBy, e);
return true;
}
}

private static void warnParseError(String createdBy, Throwable e) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
private static void warnParseErrorOnce(String createdBy, Throwable e) {
if(!alreadyLogged.getAndSet(true)) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
}
}

private static void warnOnce(String message) {
if(!alreadyLogged.getAndSet(true)) {
LOG.warn(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ public class ScroogeStructConverterTest {
*/
private void shouldConvertConsistentlyWithThriftStructConverter(Class scroogeClass) throws ClassNotFoundException {
Class<? extends TBase<?, ?>> thriftClass = (Class<? extends TBase<?, ?>>)Class.forName(scroogeClass.getName().replaceFirst("org.apache.parquet.scrooge.test", "org.apache.parquet.thrift.test"));
ThriftType.StructType structFromThriftSchemaConverter = new ThriftSchemaConverter().toStructType(thriftClass);
ThriftType.StructType structFromThriftSchemaConverter = ThriftSchemaConverter.toStructType(thriftClass);
ThriftType.StructType structFromScroogeSchemaConverter = new ScroogeStructConverter().convert(scroogeClass);

assertEquals(toParquetSchema(structFromThriftSchemaConverter), toParquetSchema(structFromScroogeSchemaConverter));
}

private MessageType toParquetSchema(ThriftType.StructType struct) {
ThriftSchemaConverter sc = new ThriftSchemaConverter();
return sc.convert(struct);
return ThriftSchemaConverter.convertWithoutProjection(struct);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ protected void init(Class<T> thriftClass) {
this.thriftClass = thriftClass;
this.thriftStruct = getThriftStruct();

ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
this.schema = thriftSchemaConverter.convert(thriftStruct);
this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);

final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
// adding the Pig schema as it would have been mapped from thrift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public TBaseWriteSupport(Class<T> thriftClass) {

@Override
protected StructType getThriftStruct() {
ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
return thriftSchemaConverter.toStructType((Class<TBase<?, ?>>)thriftClass);
return ThriftSchemaConverter.toStructType(thriftClass);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ public WriteContext init(Configuration configuration) {
} else {
thriftClass = TBaseWriteSupport.getThriftClass(configuration);
}
ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
this.thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
this.schema = thriftSchemaConverter.convert(thriftStruct);
this.thriftStruct = ThriftSchemaConverter.toStructType(thriftClass);
this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
if (buffered) {
readToWrite = new BufferedProtocolReadToWrite(thriftStruct, errorHandler);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public static Set<String> getThriftClassNames(Map<String, Set<String>> fileMetad

@Override
public String toString() {
return "ThriftMetaData" + toExtraMetaData();
return String.format("ThriftMetaData(thriftClassName: %s, descriptor: %s)", thriftClassName, descriptor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.parquet.io.ParquetDecodingException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
Expand Down Expand Up @@ -62,7 +63,7 @@
*/
public class ThriftRecordConverter<T> extends RecordMaterializer<T> {

final ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") {
final static ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") {
@Override
public void readFieldEnd() throws TException {
}
Expand All @@ -75,7 +76,7 @@ public void readFieldEnd() throws TException {
* @author Julien Le Dem
*
*/
class PrimitiveFieldHandler extends PrimitiveConverter {
static class PrimitiveFieldHandler extends PrimitiveConverter {

private final PrimitiveConverter delegate;
private final List<TProtocol> events;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void addLong(long value) {
* @author Julien Le Dem
*
*/
class GroupFieldhandler extends GroupConverter {
static class GroupFieldhandler extends GroupConverter {

private final GroupConverter delegate;
private final List<TProtocol> events;
Expand Down Expand Up @@ -203,7 +204,7 @@ interface Counter {
* @author Julien Le Dem
*
*/
class GroupCounter extends GroupConverter implements Counter {
static class GroupCounter extends GroupConverter implements Counter {

private final GroupConverter delegate;
private int count;
Expand Down Expand Up @@ -246,7 +247,7 @@ public int getCount() {
* @author Julien Le Dem
*
*/
class PrimitiveCounter extends PrimitiveConverter implements Counter {
static class PrimitiveCounter extends PrimitiveConverter implements Counter {

private final PrimitiveConverter delegate;
private int count;
Expand Down Expand Up @@ -309,7 +310,7 @@ public int getCount() {
* @author Julien Le Dem
*
*/
class FieldPrimitiveConverter extends PrimitiveConverter {
static class FieldPrimitiveConverter extends PrimitiveConverter {

private final List<TProtocol> events;
private ThriftTypeID type;
Expand Down Expand Up @@ -400,7 +401,7 @@ public long readI64() throws TException {
* @author Julien Le Dem
*
*/
class FieldStringConverter extends PrimitiveConverter {
static class FieldStringConverter extends PrimitiveConverter {

private final List<TProtocol> events;

Expand Down Expand Up @@ -429,14 +430,15 @@ public ByteBuffer readBinary() throws TException {
* @author Julien Le Dem
*
*/
class FieldEnumConverter extends PrimitiveConverter {
static class FieldEnumConverter extends PrimitiveConverter {

private final List<TProtocol> events;

private Map<Binary, Integer> enumLookup = new HashMap<Binary, Integer>();
private final Map<Binary, Integer> enumLookup = new HashMap<Binary, Integer>();
private final ThriftField field;

public FieldEnumConverter(List<TProtocol> events, ThriftField field) {
this.events = events;
this.field = field;
final Iterable<EnumValue> values = ((EnumType)field.getType()).getValues();
for (EnumValue enumValue : values) {
enumLookup.put(Binary.fromString(enumValue.getName()), enumValue.getId());
Expand All @@ -445,7 +447,16 @@ public FieldEnumConverter(List<TProtocol> events, ThriftField field) {

@Override
public void addBinary(final Binary value) {
final int id = enumLookup.get(value);
final Integer id = enumLookup.get(value);

if (id == null) {
throw new ParquetDecodingException("Unrecognized enum value: "
+ value.toStringUsingUTF8()
+ " known values: "
+ enumLookup
+ " in " + this.field);
}

events.add(new ParquetProtocol("readI32() enum") {
@Override
public int readI32() throws TException {
Expand All @@ -461,7 +472,7 @@ public int readI32() throws TException {
* @author Julien Le Dem
*
*/
class MapConverter extends GroupConverter {
static class MapConverter extends GroupConverter {

private final GroupCounter child;
private final List<TProtocol> mapEvents = new ArrayList<TProtocol>();
Expand Down Expand Up @@ -523,7 +534,7 @@ public TMap readMapBegin() throws TException {
* @author Julien Le Dem
*
*/
class MapKeyValueConverter extends GroupConverter {
static class MapKeyValueConverter extends GroupConverter {

private Converter keyConverter;
private Converter valueConverter;
Expand Down Expand Up @@ -561,7 +572,7 @@ public void end() {
* @author Julien Le Dem
*
*/
class SetConverter extends CollectionConverter {
static class SetConverter extends CollectionConverter {

final ParquetProtocol readSetEnd = new ParquetProtocol("readSetEnd()") {
@Override
Expand Down Expand Up @@ -598,7 +609,7 @@ void collectionEnd() {
* @author Julien Le Dem
*
*/
class ListConverter extends CollectionConverter {
static class ListConverter extends CollectionConverter {

final ParquetProtocol readListEnd = new ParquetProtocol("readListEnd()") {
@Override
Expand Down Expand Up @@ -635,7 +646,7 @@ void collectionEnd() {
* @author Julien Le Dem
*
*/
abstract class CollectionConverter extends GroupConverter {
static abstract class CollectionConverter extends GroupConverter {

private final Converter child;
private final Counter childCounter;
Expand Down Expand Up @@ -696,7 +707,7 @@ public void end() {
* @author Julien Le Dem
*
*/
class StructConverter extends GroupConverter {
static class StructConverter extends GroupConverter {

private final int schemaSize;

Expand Down Expand Up @@ -794,7 +805,7 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
this.thriftReader = thriftReader;
this.protocol = new ParquetReadProtocol();
this.thriftType = thriftType;
MessageType fullSchema = new ThriftSchemaConverter().convert(thriftType);
MessageType fullSchema = ThriftSchemaConverter.convertWithoutProjection(thriftType);
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
}
Expand Down Expand Up @@ -863,7 +874,7 @@ public GroupConverter getRootConverter() {
return structConverter;
}

private Converter newConverter(List<TProtocol> events, Type type, ThriftField field) {
private static Converter newConverter(List<TProtocol> events, Type type, ThriftField field) {
switch (field.getType().getType()) {
case LIST:
return new ListConverter(events, type.asGroupType(), field);
Expand Down
Loading

0 comments on commit b86f68e

Please sign in to comment.