Skip to content

Commit

Permalink
PARQUET-341 improve write performance for wide schema sparse data
Browse files Browse the repository at this point in the history
In write path, when there are tons of sparse data, most of time is spent on writing nulls.
Currently writing nulls has the same code path as writing values, which is reclusive traverse all the leaves when a group is null.
Due to the fact that when a group is null all the leaves beneath it should be written with null value with the same repetition level and definition level, we can eliminate the recursion call to get the leaves

This PR caches the leaves for each group node. So when a group node is null, their leaves can be flushed with null values directly.

We tested it with a really wide schema on one of our production data. It improves the performance by ~20%

Author: Tianshuo Deng <[email protected]>

Closes apache#247 from tsdeng/flush_null_directly and squashes the following commits:

253f2e3 [Tianshuo Deng] address comments
8676cd7 [Tianshuo Deng] flush null directly to leaves
  • Loading branch information
tsdeng committed Aug 5, 2015
1 parent b86f68e commit 2f956f4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.parquet.io;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnWriteStore;
Expand Down Expand Up @@ -172,16 +175,39 @@ public boolean isWritten(int i) {
private final FieldsMarker[] fieldsWritten;
private final int[] r;
private final ColumnWriter[] columnWriter;
/** maintain a map of a group and all the leaf nodes underneath it. It's used to optimize writing null for a group node
* all the leaves can be called directly without traversing the sub tree of the group node */
private Map<GroupColumnIO, List<ColumnWriter>> groupToLeafWriter = new HashMap<GroupColumnIO, List<ColumnWriter>>();
private final ColumnWriteStore columns;
private boolean emptyField = true;

private void buildGroupToLeafWriterMap(PrimitiveColumnIO primitive, ColumnWriter writer) {
GroupColumnIO parent = primitive.getParent();
do {
getLeafWriters(parent).add(writer);
parent = parent.getParent();
} while (parent != null);
}

private List<ColumnWriter> getLeafWriters(GroupColumnIO group) {
List<ColumnWriter> writers = groupToLeafWriter.get(group);
if (writers == null) {
writers = new ArrayList<ColumnWriter>();
groupToLeafWriter.put(group, writers);
}
return writers;
}

public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
this.columns = columns;
int maxDepth = 0;
this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];

for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
ColumnWriter w = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length);
columnWriter[primitiveColumnIO.getId()] = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
columnWriter[primitiveColumnIO.getId()] = w;
buildGroupToLeafWriterMap(primitiveColumnIO, w);
}

fieldsWritten = new FieldsMarker[maxDepth];
Expand Down Expand Up @@ -271,10 +297,13 @@ private void writeNull(ColumnIO undefinedField, int r, int d) {
columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d);
} else {
GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
int childrenCount = groupColumnIO.getChildrenCount();
for (int i = 0; i < childrenCount; i++) {
writeNull(groupColumnIO.getChild(i), r, d);
}
writeNullToLeaves(groupColumnIO, r, d);
}
}

private void writeNullToLeaves(GroupColumnIO group, int r, int d) {
for(ColumnWriter leafWriter: groupToLeafWriter.get(group)) {
leafWriter.writeNull(r,d);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class InternalParquetRecordReader<T> {

private MessageType requestedSchema;
private MessageType fileSchema;
private MessageColumnIO columnIO;
private int columnCount;
private final ReadSupport<T> readSupport;

Expand Down Expand Up @@ -136,7 +137,6 @@ private void checkRead() throws IOException {
BenchmarkCounter.incrementTime(timeSpentReading);
if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
Expand Down Expand Up @@ -174,6 +174,7 @@ public void initialize(MessageType fileSchema,
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.fileSchema = fileSchema;
this.columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
this.file = file;
this.columnCount = requestedSchema.getPaths().size();
this.recordConverter = readSupport.prepareForRead(
Expand Down

0 comments on commit 2f956f4

Please sign in to comment.