Skip to content

Commit

Permalink
[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader
Browse files Browse the repository at this point in the history
This PR add implements the new `buildReader` interface for the Parquet `FileFormat`.  An simple implementation of `FileScanRDD` is also included.

This code should be tested by the many existing tests for parquet.

Author: Michael Armbrust <[email protected]>
Author: Sameer Agarwal <[email protected]>
Author: Nong Li <[email protected]>

Closes apache#11709 from marmbrus/parquetReader.
  • Loading branch information
marmbrus committed Mar 22, 2016
1 parent 7299961 commit 8014a51
Show file tree
Hide file tree
Showing 13 changed files with 366 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.parquet.schema.Type;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.*;

/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
Expand All @@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private int numBatched = 0;

/**
* For each request column, the reader to read this column.
* For each request column, the reader to read this column. This is NULL if this column
* is missing from the file, in which case we populate the attribute with NULL.
*/
private VectorizedColumnReader[] columnReaders;

Expand All @@ -66,6 +69,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private long totalCountLoadedSoFar = 0;

/**
* For each column, true if the column is missing in the file and we'll instead return NULLs.
*/
private boolean[] missingColumns;

/**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row
Expand Down Expand Up @@ -163,14 +171,53 @@ public float getProgress() throws IOException, InterruptedException {
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/
public ColumnarBatch resultBatch() {
return resultBatch(DEFAULT_MEMORY_MODE);
}

public ColumnarBatch resultBatch(MemoryMode memMode) {
if (columnarBatch == null) {
columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
// Creates a columnar batch that includes the schema from the data files and the additional
// partition columns appended to the end of the batch.
// For example, if the data contains two columns, with 2 partition columns:
// Columns 0,1: data columns
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
public void initBatch(MemoryMode memMode, StructType partitionColumns,
InternalRow partitionValues) {
StructType batchSchema = new StructType();
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
}
if (partitionColumns != null) {
for (StructField f : partitionColumns.fields()) {
batchSchema = batchSchema.add(f);
}
}

columnarBatch = ColumnarBatch.allocate(batchSchema);
if (partitionColumns != null) {
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
columnarBatch.column(i + partitionIdx).setIsConstant();
}
}

// Initialize missing columns with nulls.
for (int i = 0; i < missingColumns.length; i++) {
if (missingColumns[i]) {
columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
columnarBatch.column(i).setIsConstant();
}
}
}

public void initBatch() {
initBatch(DEFAULT_MEMORY_MODE, null, null);
}

public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
}

public ColumnarBatch resultBatch() {
if (columnarBatch == null) initBatch();
return columnarBatch;
}

Expand All @@ -191,6 +238,7 @@ public boolean nextBatch() throws IOException {

int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnarBatch.column(i));
}
rowsReturned += num;
Expand All @@ -205,6 +253,7 @@ private void initializeInternal() throws IOException {
* Check that the requested schema is supported.
*/
OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
Expand All @@ -223,9 +272,19 @@ private void initializeInternal() throws IOException {
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
throw new IOException("Int96 not supported.");
}
ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i));
if (!fd.equals(requestedSchema.getColumns().get(i))) {
throw new IOException("Schema evolution not supported.");
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
if (!fd.equals(requestedSchema.getColumns().get(i))) {
throw new IOException("Schema evolution not supported.");
}
missingColumns[i] = false;
} else {
if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
// Column is missing in data but the required data is non-nullable. This file is invalid.
throw new IOException("Required column is missing in data file. Col: " + colPath);
}
missingColumns[i] = true;
}
}
}
Expand All @@ -240,6 +299,7 @@ private void checkEndOfRowGroup() throws IOException {
List<ColumnDescriptor> columns = requestedSchema.getColumns();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ public Object get(int ordinal, DataType dataType) {
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
if (isConstant) return;

if (childColumns != null) {
for (ColumnVector c: childColumns) {
c.reset();
Expand Down Expand Up @@ -822,6 +824,11 @@ public final int appendStruct(boolean isNull) {
*/
public final boolean isArray() { return resultArray != null; }

/**
* Marks this column as being constant.
*/
public final void setIsConstant() { isConstant = true; }

/**
* Maximum number of rows that can be stored in this column.
*/
Expand All @@ -843,6 +850,12 @@ public final int appendStruct(boolean isNull) {
*/
protected boolean anyNullsSet;

/**
* True if this column's values are fixed. This means the column values never change, even
* across resets.
*/
protected boolean isConstant;

/**
* Default size of each array length value. This grows as necessary.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* This class is the in memory representation of rows as they are streamed through operators. It
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
* each operator allocates one of thee objects, the storage footprint on the task is negligible.
* each operator allocates one of these objects, the storage footprint on the task is negligible.
*
* The layout is a columnar with values encoded in their native format. Each RowBatch contains
* a horizontal partitioning of the data, split into columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ case class DataSource(
"It must be specified manually")
}


HadoopFsRelation(
sqlContext,
fileCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

Expand All @@ -31,13 +31,17 @@ case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long)
length: Long) {
override def toString(): String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
}


/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
*
* IMPLEMENT ME: This is just a placeholder for a future implementation.
* TODO: This currently does not take locality information about the files into account.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
Expand All @@ -48,10 +52,38 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {


override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
throw new NotImplementedError("Not Implemented Yet")
val iterator = new Iterator[Object] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentIterator: Iterator[Object] = null

def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
def next() = currentIterator.next()

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
SqlNewHadoopRDDState.unsetInputFileName()
false
}
}

override def close() = {
SqlNewHadoopRDDState.unsetInputFileName()
}
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => iterator.close())

iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
}

override protected def getPartitions: Array[Partition] = Array.empty
override protected def getPartitions: Array[Partition] = filePartitions.toArray
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ import org.apache.spark.sql.types._
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
if files.fileFormat.toString == "TestFileFormat" =>
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
Expand All @@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters)

val partitionColumns =
AttributeSet(
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)

val bucketColumns =
AttributeSet(
files.bucketSpec
Expand All @@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
.getOrElse(sys.error(""))))

// Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)

// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
Expand All @@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet
val requiredAttributes = AttributeSet(requiredExpressions)

val prunedDataSchema =
StructType(
files.dataSchema.filter(f => requiredAttributes.contains(f.name)))
val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val prunedDataSchema = readDataColumns.toStructType
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
Expand Down Expand Up @@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
assert(file.getLen != 0)
assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
Expand Down Expand Up @@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {

val scan =
DataSourceScan(
l.output,
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,
readFile,
plannedPartitions),
files,
Map("format" -> files.fileFormat.toString))
Map(
"Format" -> files.fileFormat.toString,
"PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
"ReadSchema" -> prunedDataSchema.simpleString))

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) {
val withProjections = if (projects == withFilter.output) {
withFilter
} else {
execution.Project(projects, withFilter)
Expand Down
Loading

0 comments on commit 8014a51

Please sign in to comment.