Skip to content

Commit

Permalink
Fix skip values with Parquet lazy reads
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenxiao authored and dain committed Feb 16, 2016
1 parent ec1c516 commit 9fae331
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public Page getNextPage()
}
else {
ColumnDescriptor columnDescriptor = this.requestedSchema.getColumns().get(fieldId);
blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(columnDescriptor, batchSize, type));
blocks[fieldId] = new LazyBlock(batchSize, new ParquetBlockLoader(columnDescriptor, type));
}
}
Page page = new Page(batchSize, blocks);
Expand Down Expand Up @@ -287,14 +287,12 @@ private final class ParquetBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private final int expectedBatchId = batchId;
private final int batchSize;
private final ColumnDescriptor columnDescriptor;
private final Type type;
private boolean loaded;

public ParquetBlockLoader(ColumnDescriptor columnDescriptor, int batchSize, Type type)
public ParquetBlockLoader(ColumnDescriptor columnDescriptor, Type type)
{
this.batchSize = batchSize;
this.columnDescriptor = columnDescriptor;
this.type = requireNonNull(type, "type is null");
}
Expand All @@ -309,7 +307,7 @@ public final void load(LazyBlock lazyBlock)
checkState(batchId == expectedBatchId);

try {
Block block = parquetReader.readBlock(columnDescriptor, batchSize, type);
Block block = parquetReader.readBlock(columnDescriptor, type);
lazyBlock.setBlock(block);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.parquet.reader;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import io.airlift.slice.Slices;
import parquet.column.ColumnDescriptor;
import parquet.io.api.Binary;

import static com.facebook.presto.spi.type.VarcharType.VARCHAR;

public class ParquetBinaryColumnReader
extends ParquetColumnReader
{
public ParquetBinaryColumnReader(ColumnDescriptor descriptor)
{
super(descriptor);
}

public BlockBuilder createBlockBuilder()
{
return VARCHAR.createBlockBuilder(new BlockBuilderStatus(), this.nextBatchSize);
}

@Override
public void readValues(BlockBuilder blockBuilder, int valueNumber)
{
for (int i = 0; i < valueNumber; i++) {
if (this.definitionReader.readLevel() == this.columnDescriptor.getMaxDefinitionLevel()) {
Binary binary = valuesReader.readBytes();
if (binary.length() == 0) {
VARCHAR.writeSlice(blockBuilder, Slices.EMPTY_SLICE);
}
else {
VARCHAR.writeSlice(blockBuilder, Slices.wrappedBuffer(binary.getBytes()));
}
}
else {
blockBuilder.appendNull();
}
}
}

@Override
public void skipValues(int offsetNumber)
{
for (int i = 0; i < offsetNumber; i++) {
if (this.definitionReader.readLevel() == this.columnDescriptor.getMaxDefinitionLevel()) {
this.valuesReader.readBytes();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.parquet.reader;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import parquet.column.ColumnDescriptor;

import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;

public class ParquetBooleanColumnReader
extends ParquetColumnReader
{
public ParquetBooleanColumnReader(ColumnDescriptor descriptor)
{
super(descriptor);
}

public BlockBuilder createBlockBuilder()
{
return BOOLEAN.createBlockBuilder(new BlockBuilderStatus(), this.nextBatchSize);
}

@Override
public void readValues(BlockBuilder blockBuilder, int valueNumber)
{
for (int i = 0; i < valueNumber; i++) {
if (this.definitionReader.readLevel() == this.columnDescriptor.getMaxDefinitionLevel()) {
BOOLEAN.writeBoolean(blockBuilder, this.valuesReader.readBoolean());
}
else {
blockBuilder.appendNull();
}
}
}

@Override
public void skipValues(int offsetNumber)
{
for (int i = 0; i < offsetNumber; i++) {
if (this.definitionReader.readLevel() == this.columnDescriptor.getMaxDefinitionLevel()) {
this.valuesReader.readBoolean();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
*/
package com.facebook.presto.hive.parquet.reader;

import com.facebook.presto.hive.parquet.reader.block.ParquetBlockBuilder;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type;
import parquet.bytes.BytesInput;
import parquet.bytes.BytesUtils;
Expand All @@ -35,29 +36,67 @@
import java.io.IOException;

import static com.facebook.presto.hive.parquet.ParquetValidationUtils.validateParquet;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class ParquetColumnReader
public abstract class ParquetColumnReader
{
private final ColumnDescriptor columnDescriptor;
private final long totalValueCount;
private final PageReader pageReader;
private final Dictionary dictionary;
protected final ColumnDescriptor columnDescriptor;

private ParquetLevelReader repetitionReader;
private ParquetLevelReader definitionReader;
protected ParquetLevelReader repetitionReader;
protected ParquetLevelReader definitionReader;
protected ValuesReader valuesReader;
protected int nextBatchSize;

private long totalValueCount;
private PageReader pageReader;
private Dictionary dictionary;
private int repetitionLevel;
private int definitionLevel;
private int currentValueCount;
private int pageValueCount;
private DataPage page;
private ValuesReader valuesReader;
private int remainingValueCountInPage;
private int readOffset;

public abstract BlockBuilder createBlockBuilder();
public abstract void readValues(BlockBuilder blockBuilder, int valueNumber);
public abstract void skipValues(int offsetNumber);

public ParquetColumnReader(ColumnDescriptor columnDescriptor, PageReader pageReader)
public static ParquetColumnReader createReader(ColumnDescriptor descriptor)
{
switch (descriptor.getType()) {
case BOOLEAN:
return new ParquetBooleanColumnReader(descriptor);
case INT32:
return new ParquetIntColumnReader(descriptor);
case INT64:
return new ParquetLongColumnReader(descriptor);
case FLOAT:
return new ParquetFloatColumnReader(descriptor);
case DOUBLE:
return new ParquetDoubleColumnReader(descriptor);
case BINARY:
return new ParquetBinaryColumnReader(descriptor);
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported parquet type: " + descriptor.getType());
}
}

public ParquetColumnReader(ColumnDescriptor columnDescriptor)
{
this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor");
pageReader = null;
}

public PageReader getPageReader()
{
return pageReader;
}

public void setPageReader(PageReader pageReader)
{
this.pageReader = requireNonNull(pageReader, "pageReader");
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();

Expand All @@ -76,6 +115,12 @@ public ParquetColumnReader(ColumnDescriptor columnDescriptor, PageReader pageRea
totalValueCount = pageReader.getTotalValueCount();
}

public void prepareNextRead(int batchSize)
{
readOffset = readOffset + nextBatchSize;
nextBatchSize = batchSize;
}

public int getCurrentRepetitionLevel()
{
return repetitionLevel;
Expand All @@ -96,39 +141,68 @@ public long getTotalValueCount()
return totalValueCount;
}

public Block readBlock(int vectorSize, Type type)
public Block readBlock(Type type)
throws IOException
{
checkArgument(currentValueCount <= totalValueCount, "Already read all values in this column chunk");
checkArgument(currentValueCount <= totalValueCount, "Already read all values in column chunk");
// Parquet does not have api to skip in datastream, have to skip values
// TODO skip in datastream
if (readOffset != 0) {
int valuePosition = 0;
while (valuePosition < readOffset) {
if (page == null) {
readNextPage();
}
int offsetNumber = Math.min(remainingValueCountInPage, readOffset - valuePosition);
skipValues(offsetNumber);
valuePosition = valuePosition + offsetNumber;
updatePosition(offsetNumber);
}
checkArgument(valuePosition == readOffset, "valuePosition " + valuePosition + " not equals to readOffset " + readOffset);
}

BlockBuilder blockBuilder = createBlockBuilder();
int valueCount = 0;
ParquetBlockBuilder blockBuilder = ParquetBlockBuilder.createBlockBuilder(vectorSize, columnDescriptor);
while (valueCount < vectorSize) {
while (valueCount < nextBatchSize) {
if (page == null) {
page = pageReader.readPage();
validateParquet(page != null, "Parquet file does not have enough values for column chunk");
readNextPage();
}
int valueNumber = Math.min(remainingValueCountInPage, nextBatchSize - valueCount);
readValues(blockBuilder, valueNumber);
valueCount = valueCount + valueNumber;
updatePosition(valueNumber);
}
checkArgument(valueCount == nextBatchSize, "valueCount " + valueCount + " not equals to batchSize " + nextBatchSize);

remainingValueCountInPage = page.getValueCount();
readOffset = 0;
nextBatchSize = 0;
return blockBuilder.build();
}

if (page instanceof DataPageV1) {
valuesReader = readPageV1((DataPageV1) page);
}
else {
valuesReader = readPageV2((DataPageV2) page);
}
}
private void readNextPage()
throws IOException
{
page = pageReader.readPage();
validateParquet(page != null, "Not enough values to read in column chunk");
pageValueCount = page.getValueCount();
remainingValueCountInPage = page.getValueCount();

int valueNumber = Math.min(remainingValueCountInPage, vectorSize - valueCount);
blockBuilder.readValues(valuesReader, valueNumber, definitionReader);
valueCount = valueCount + valueNumber;
if (page instanceof DataPageV1) {
valuesReader = readPageV1((DataPageV1) page);
}
else {
valuesReader = readPageV2((DataPageV2) page);
}
}

if (valueNumber == remainingValueCountInPage) {
page = null;
valuesReader = null;
}
remainingValueCountInPage = remainingValueCountInPage - valueNumber;
currentValueCount += valueNumber;
private void updatePosition(int valueNumber)
{
if (valueNumber == remainingValueCountInPage) {
page = null;
valuesReader = null;
}
return blockBuilder.buildBlock();
remainingValueCountInPage = remainingValueCountInPage - valueNumber;
currentValueCount += valueNumber;
}

private ValuesReader readPageV1(DataPageV1 page)
Expand Down Expand Up @@ -177,7 +251,6 @@ private ParquetLevelReader buildLevelRLEReader(int maxLevel, BytesInput bytes)

private ValuesReader initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
{
pageValueCount = valueCount;
ValuesReader valuesReader;
if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
Expand Down
Loading

0 comments on commit 9fae331

Please sign in to comment.