Skip to content

Commit

Permalink
PARQUET-284: Clean up ParquetMetadataConverter
Browse files Browse the repository at this point in the history
makes all method static, removes unused thread-unsafe cache, etc.

Turns out the "cache" was only read from *after* rebuilding what needed to be cached... so no performance gain there (and no loss by getting rid of it)

However, I don't know if this will fix the issue mentioned in PARQUET-284, I don't think concurrent access to a HashMap will cause deadlock, it would just cause undefined behavior in reads or maybe ConcurrentModificationException

UPDATE: I'm wrong, it can cause an infinite loop so this should fix the issue https://gist.github.com/rednaxelafx/1081908

UPDATE2: Put the cache back in, made it static + thread safe

Author: Alex Levenson <[email protected]>

Closes apache#220 from isnotinvain/alexlevenson/PARQUET-284 and squashes the following commits:

4797b48 [Alex Levenson] Fix merge conflict issue
8ff5775 [Alex Levenson] Merge branch 'master' into alexlevenson/PARQUET-284
ccd4776 [Alex Levenson] add encoding cache back in
9ea5a5f [Alex Levenson] Clean up ParquetMetadataConverter: make all method static, remove unused thread-unsafe cache
  • Loading branch information
isnotinvain committed Jun 24, 2015
1 parent 46448e9 commit 5c2ba72
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.parquet.Log;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand Down Expand Up @@ -67,10 +68,25 @@
import org.apache.parquet.schema.TypeVisitor;
import org.apache.parquet.schema.Types;

// TODO: This file has become too long!
// TODO: Lets split it up: https://issues.apache.org/jira/browse/PARQUET-310
public class ParquetMetadataConverter {
private ParquetMetadataConverter() { }

public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();

private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);

public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
// sets of encodings. It is important that all collections inserted to this cache be
// immutable and have thread-safe read-only access. This can be achieved by wrapping
// an unsynchronized collection in Collections.unmodifiable*(), and making sure to not
// keep any references to the original collection.
private static final ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>
cachedEncodingSets = new ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>();

public static FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
List<BlockMetaData> blocks = parquetMetadata.getBlocks();
List<RowGroup> rowGroups = new ArrayList<RowGroup>();
int numRows = 0;
Expand All @@ -93,13 +109,14 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque
return fileMetaData;
}

List<SchemaElement> toParquetSchema(MessageType schema) {
// Visible for testing
static List<SchemaElement> toParquetSchema(MessageType schema) {
List<SchemaElement> result = new ArrayList<SchemaElement>();
addToList(result, schema);
return result;
}

private void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
private static void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
field.accept(new TypeVisitor() {
@Override
public void visit(PrimitiveType primitiveType) {
Expand Down Expand Up @@ -146,7 +163,7 @@ private void visitChildren(final List<SchemaElement> result,
});
}

private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
private static void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
//rowGroup.total_byte_size = ;
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
Expand Down Expand Up @@ -175,62 +192,43 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
rowGroups.add(rowGroup);
}

private List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.Encoding> encodings) {
private static List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.Encoding> encodings) {
List<Encoding> converted = new ArrayList<Encoding>(encodings.size());
for (org.apache.parquet.column.Encoding encoding : encodings) {
converted.add(getEncoding(encoding));
}
return converted;
}

private static final class EncodingList {

private final Set<org.apache.parquet.column.Encoding> encodings;

public EncodingList(Set<org.apache.parquet.column.Encoding> encodings) {
this.encodings = encodings;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof EncodingList) {
Set<org.apache.parquet.column.Encoding> other = ((EncodingList)obj).encodings;
return other.size() == encodings.size() && encodings.containsAll(other);
}
return false;
}

@Override
public int hashCode() {
int result = 1;
for (org.apache.parquet.column.Encoding element : encodings)
result = 31 * result + (element == null ? 0 : element.hashCode());
return result;
}
}

private Map<EncodingList, Set<org.apache.parquet.column.Encoding>> encodingLists = new HashMap<EncodingList, Set<org.apache.parquet.column.Encoding>>();

private Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
// Visible for testing
static Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
Set<org.apache.parquet.column.Encoding> converted = new HashSet<org.apache.parquet.column.Encoding>();

for (Encoding encoding : encodings) {
converted.add(getEncoding(encoding));
}

// make converted unmodifiable, drop reference to modifiable copy
converted = Collections.unmodifiableSet(converted);
EncodingList key = new EncodingList(converted);
Set<org.apache.parquet.column.Encoding> cached = encodingLists.get(key);

// atomically update the cache
Set<org.apache.parquet.column.Encoding> cached = cachedEncodingSets.putIfAbsent(converted, converted);

if (cached == null) {
// cached == null signifies that converted was *not* in the cache previously
// so we can return converted instead of throwing it away, it has now
// been cached
cached = converted;
encodingLists.put(key, cached);
}

return cached;
}

public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
public static org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
return org.apache.parquet.column.Encoding.valueOf(encoding.name());
}

public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) {
public static Encoding getEncoding(org.apache.parquet.column.Encoding encoding) {
return Encoding.valueOf(encoding.name());
}

Expand Down Expand Up @@ -259,7 +257,7 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist
return stats;
}

public PrimitiveTypeName getPrimitive(Type type) {
public static PrimitiveTypeName getPrimitive(Type type) {
switch (type) {
case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
return PrimitiveTypeName.BINARY;
Expand All @@ -282,7 +280,8 @@ public PrimitiveTypeName getPrimitive(Type type) {
}
}

Type getType(PrimitiveTypeName type) {
// Visible for testing
static Type getType(PrimitiveTypeName type) {
switch (type) {
case INT64:
return Type.INT64;
Expand All @@ -305,7 +304,8 @@ Type getType(PrimitiveTypeName type) {
}
}

OriginalType getOriginalType(ConvertedType type) {
// Visible for testing
static OriginalType getOriginalType(ConvertedType type) {
switch (type) {
case UTF8:
return OriginalType.UTF8;
Expand Down Expand Up @@ -352,7 +352,8 @@ OriginalType getOriginalType(ConvertedType type) {
}
}

ConvertedType getConvertedType(OriginalType type) {
// Visible for testing
static ConvertedType getConvertedType(OriginalType type) {
switch (type) {
case UTF8:
return ConvertedType.UTF8;
Expand Down Expand Up @@ -399,7 +400,7 @@ ConvertedType getConvertedType(OriginalType type) {
}
}

private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
private static void addKeyValue(FileMetaData fileMetaData, String key, String value) {
KeyValue keyValue = new KeyValue(key);
keyValue.value = value;
fileMetaData.addToKey_value_metadata(keyValue);
Expand All @@ -415,15 +416,13 @@ public abstract static class MetadataFilter {
private MetadataFilter() {}
abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
}
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
/**
* [ startOffset, endOffset )
* @param startOffset
* @param endOffset
* @return the filter
*/
public static final MetadataFilter range(long startOffset, long endOffset) {
public static MetadataFilter range(long startOffset, long endOffset) {
return new RangeMetadataFilter(startOffset, endOffset);
}
private static final class NoFilter extends MetadataFilter {
Expand Down Expand Up @@ -452,6 +451,7 @@ public String toString() {
* [ startOffset, endOffset )
* @author Julien Le Dem
*/
// Visible for testing
static final class RangeMetadataFilter extends MetadataFilter {
final long startOffset;
final long endOffset;
Expand All @@ -474,10 +474,11 @@ public String toString() {
}

@Deprecated
public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
public static ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
return readParquetMetadata(from, NO_FILTER);
}

// Visible for testing
static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
Expand All @@ -496,9 +497,11 @@ static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilte
return metaData;
}

// Visible for testing
static long getOffset(RowGroup rowGroup) {
return getOffset(rowGroup.getColumns().get(0));
}
// Visible for testing
static long getOffset(ColumnChunk columnChunk) {
ColumnMetaData md = columnChunk.getMeta_data();
long offset = md.getData_page_offset();
Expand All @@ -508,7 +511,7 @@ static long getOffset(ColumnChunk columnChunk) {
return offset;
}

public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
public static ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
Expand All @@ -529,7 +532,7 @@ public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return parquetMetadata;
}

public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
public static ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
Expand Down Expand Up @@ -579,20 +582,21 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
blocks);
}

private ColumnPath getPath(ColumnMetaData metaData) {
private static ColumnPath getPath(ColumnMetaData metaData) {
String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
return ColumnPath.get(path);
}

MessageType fromParquetSchema(List<SchemaElement> schema) {
// Visible for testing
static MessageType fromParquetSchema(List<SchemaElement> schema) {
Iterator<SchemaElement> iterator = schema.iterator();
SchemaElement root = iterator.next();
Types.MessageTypeBuilder builder = Types.buildMessage();
buildChildren(builder, iterator, root.getNum_children());
return builder.named(root.name);
}

private void buildChildren(Types.GroupBuilder builder,
private static void buildChildren(Types.GroupBuilder builder,
Iterator<SchemaElement> schema,
int childrenCount) {
for (int i = 0; i < childrenCount; i++) {
Expand Down Expand Up @@ -631,16 +635,18 @@ private void buildChildren(Types.GroupBuilder builder,
}
}

FieldRepetitionType toParquetRepetition(Repetition repetition) {
// Visible for testing
static FieldRepetitionType toParquetRepetition(Repetition repetition) {
return FieldRepetitionType.valueOf(repetition.name());
}

Repetition fromParquetRepetition(FieldRepetitionType repetition) {
// Visible for testing
static Repetition fromParquetRepetition(FieldRepetitionType repetition) {
return Repetition.valueOf(repetition.name());
}

@Deprecated
public void writeDataPageHeader(
public static void writeDataPageHeader(
int uncompressedSize,
int compressedSize,
int valueCount,
Expand All @@ -657,7 +663,7 @@ public void writeDataPageHeader(
valuesEncoding), to);
}

public void writeDataPageHeader(
public static void writeDataPageHeader(
int uncompressedSize,
int compressedSize,
int valueCount,
Expand All @@ -669,7 +675,7 @@ public void writeDataPageHeader(
writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
}

private PageHeader newDataPageHeader(
private static PageHeader newDataPageHeader(
int uncompressedSize, int compressedSize,
int valueCount,
org.apache.parquet.column.statistics.Statistics statistics,
Expand All @@ -689,7 +695,7 @@ private PageHeader newDataPageHeader(
return pageHeader;
}

public void writeDataPageV2Header(
public static void writeDataPageV2Header(
int uncompressedSize, int compressedSize,
int valueCount, int nullCount, int rowCount,
org.apache.parquet.column.statistics.Statistics statistics,
Expand All @@ -705,7 +711,7 @@ public void writeDataPageV2Header(
rlByteLength, dlByteLength), to);
}

private PageHeader newDataPageV2Header(
private static PageHeader newDataPageV2Header(
int uncompressedSize, int compressedSize,
int valueCount, int nullCount, int rowCount,
org.apache.parquet.column.statistics.Statistics<?> statistics,
Expand All @@ -724,7 +730,7 @@ private PageHeader newDataPageV2Header(
return pageHeader;
}

public void writeDictionaryPageHeader(
public static void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
class ColumnChunkPageWriteStore implements PageWriteStore {
private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private static final class ColumnChunkPageWriter implements PageWriter {

private final ColumnDescriptor path;
Expand Down Expand Up @@ -94,7 +92,7 @@ public void writePage(BytesInput bytes,
+ compressedSize);
}
tempOutputStream.reset();
parquetMetadataConverter.writeDataPageHeader(
ParquetMetadataConverter.writeDataPageHeader(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
Expand Down Expand Up @@ -133,7 +131,7 @@ public void writePageV2(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
tempOutputStream.reset();
parquetMetadataConverter.writeDataPageV2Header(
ParquetMetadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
statistics,
Expand Down
Loading

0 comments on commit 5c2ba72

Please sign in to comment.