Skip to content

Commit

Permalink
[SPARK-6123] [SPARK-6775] [SPARK-6776] [SQL] Refactors Parquet read p…
Browse files Browse the repository at this point in the history
…ath for interoperability and backwards-compatibility

This PR is a follow-up of apache#6617 and is part of [SPARK-6774] [2], which aims to ensure interoperability and backwards-compatibility for Spark SQL Parquet support.  And this one fixes the read path.  Now Spark SQL is expected to be able to read legacy Parquet data files generated by most (if not all) common libraries/tools like parquet-thrift, parquet-avro, and parquet-hive. However, we still need to refactor the write path to write standard Parquet LISTs and MAPs ([SPARK-8848] [4]).

### Major changes

1. `CatalystConverter` class hierarchy refactoring

   - Replaces `CatalystConverter` trait with a much simpler `ParentContainerUpdater`.

     Now instead of extending the original `CatalystConverter` trait, every converter class accepts an updater which is responsible for propagating the converted value to some parent container. For example, appending array elements to a parent array buffer, appending a key-value pairs to a parent mutable map, or setting a converted value to some specific field of a parent row. Root converter doesn't have a parent and thus uses a `NoopUpdater`.

     This simplifies the design since converters don't need to care about details of their parent converters anymore.

   - Unifies `CatalystRootConverter`, `CatalystGroupConverter` and `CatalystPrimitiveRowConverter` into `CatalystRowConverter`

     Specifically, now all row objects are represented by `SpecificMutableRow` during conversion.

   - Refactors `CatalystArrayConverter`, and removes `CatalystArrayContainsNullConverter` and `CatalystNativeArrayConverter`

     `CatalystNativeArrayConverter` was probably designed with the intention of avoiding boxing costs. However, the way it uses Scala generics actually doesn't achieve this goal.

     The new `CatalystArrayConverter` handles both nullable and non-nullable array elements in a consistent way.

   - Implements backwards-compatibility rules in `CatalystArrayConverter`

     When Parquet records are being converted, schema of Parquet files should have already been verified. So we only need to care about the structure rather than field names in the Parquet schema. Since all map objects represented in legacy systems have the same structure as the standard one (see [backwards-compatibility rules for MAP] [1]), we only need to deal with LIST (namely array) in `CatalystArrayConverter`.

2. Requested columns handling

   When specifying requested columns in `RowReadSupport`, we used to use a Parquet `MessageType` converted from a Catalyst `StructType` which contains all requested columns.  This is not preferable when taking compatibility and interoperability into consideration.  Because the actual Parquet file may have different physical structure from the converted schema.

   In this PR, the schema for requested columns is constructed using the following method:

   - For a column that exists in the target Parquet file, we extract the column type by name from the full file schema, and construct a single-field `MessageType` for that column.
   - For a column that doesn't exist in the target Parquet file, we create a single-field `StructType` and convert it to a `MessageType` using `CatalystSchemaConverter`.
   - Unions all single-field `MessageType`s into a full schema containing all requested fields

   With this change, we also fix [SPARK-6123] [3] by validating the global schema against each individual Parquet part-files.

### Testing

This PR also adds compatibility tests for parquet-avro, parquet-thrift, and parquet-hive. Please refer to `README.md` under `sql/core/src/test` for more information about these tests. To avoid build time code generation and adding extra complexity to the build system, Java code generated from testing Thrift schema and Avro IDL is also checked in.

[1]: https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
[2]: https://issues.apache.org/jira/browse/SPARK-6774
[3]: https://issues.apache.org/jira/browse/SPARK-6123
[4]: https://issues.apache.org/jira/browse/SPARK-8848

Author: Cheng Lian <[email protected]>

Closes apache#7231 from liancheng/spark-6776 and squashes the following commits:

360fe18 [Cheng Lian] Adds ParquetHiveCompatibilitySuite
c6fbc06 [Cheng Lian] Removes WIP file committed by mistake
b8c1295 [Cheng Lian] Excludes the whole parquet package from MiMa
598c3e8 [Cheng Lian] Adds extra Maven repo for hadoop-lzo, which is a transitive dependency of parquet-thrift
926af87 [Cheng Lian] Simplifies Parquet compatibility test suites
7946ee1 [Cheng Lian] Fixes Scala styling issues
3d7ab36 [Cheng Lian] Fixes .rat-excludes
a8f13bb [Cheng Lian] Using Parquet writer API to do compatibility tests
f2208cd [Cheng Lian] Adds README.md for Thrift/Avro code generation
1d390aa [Cheng Lian] Adds parquet-thrift compatibility test
440f7b3 [Cheng Lian] Adds generated files to .rat-excludes
13b9121 [Cheng Lian] Adds ParquetAvroCompatibilitySuite
06cfe9d [Cheng Lian] Adds comments about TimestampType handling
a099d3e [Cheng Lian] More comments
0cc1b37 [Cheng Lian] Fixes MiMa checks
884d3e6 [Cheng Lian] Fixes styling issue and reverts unnecessary changes
802cbd7 [Cheng Lian] Fixes bugs related to schema merging and empty requested columns
38fe1e7 [Cheng Lian] Adds explicit return type
7fb21f1 [Cheng Lian] Reverts an unnecessary debugging change
1781dff [Cheng Lian] Adds test case for SPARK-8811
6437d4b [Cheng Lian] Assembles requested schema from Parquet file schema
bcac49f [Cheng Lian] Removes the 16-byte restriction of decimals
a74fb2c [Cheng Lian] More comments
0525346 [Cheng Lian] Removes old Parquet record converters
03c3bd9 [Cheng Lian] Refactors Parquet read path to implement backwards-compatibility rules
  • Loading branch information
liancheng committed Jul 8, 2015
1 parent 5687f76 commit 4ffc27c
Show file tree
Hide file tree
Showing 27 changed files with 5,984 additions and 919 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
33 changes: 33 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<thrift.version>0.9.2</thrift.version>
<!-- For maven shade plugin (see SPARK-8819) -->
<create.dependency.reduced.pom>false</create.dependency.reduced.pom>

Expand All @@ -179,6 +180,8 @@
<hbase.deps.scope>compile</hbase.deps.scope>
<hive.deps.scope>compile</hive.deps.scope>
<parquet.deps.scope>compile</parquet.deps.scope>
<parquet.test.deps.scope>test</parquet.test.deps.scope>
<thrift.test.deps.scope>test</thrift.test.deps.scope>

<!--
Overridable test home. So that you can call individual pom files directly without
Expand Down Expand Up @@ -270,6 +273,18 @@
<enabled>false</enabled>
</snapshots>
</repository>
<!-- For transitive dependencies brougt by parquet-thrift -->
<repository>
<id>twttr-repo</id>
<name>Twttr Repository</name>
<url>http://maven.twttr.com</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<!-- TODO: This can be deleted after Spark 1.4 is posted -->
<repository>
<id>spark-1.4-staging</id>
Expand Down Expand Up @@ -1101,6 +1116,24 @@
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-thrift</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
<scope>${thrift.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
Expand Down
17 changes: 2 additions & 15 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,8 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticCostFun.this"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution"),
// NanoTime and CatalystTimestampConverter is only used inside catalyst,
// not needed anymore
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
// SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo$")
// Parquet support is considered private.
excludePackage("org.apache.spark.sql.parquet")
) ++ Seq(
// SPARK-8479 Add numNonzeros and numActives to Matrix.
ProblemFilters.exclude[MissingMethodProblem](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.types

import scala.util.Try
import scala.util.parsing.combinator.RegexParsers

import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -82,6 +83,9 @@ abstract class DataType extends AbstractDataType {


object DataType {
private[sql] def fromString(raw: String): DataType = {
Try(DataType.fromJson(raw)).getOrElse(DataType.fromCaseClassString(raw))
}

def fromJson(json: String): DataType = parseDataType(parse(json))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ object StructType extends AbstractDataType {

private[sql] override def simpleString: String = "struct"

private[sql] def fromString(raw: String): StructType = DataType.fromString(raw) match {
case t: StructType => t
case _ => throw new RuntimeException(s"Failed parsing StructType: $raw")
}

def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)

def apply(fields: java.util.List[StructField]): StructType = {
Expand Down
36 changes: 36 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,45 @@
<version>9.3-1102-jdbc41</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-thrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
<source>src/test/gen-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 4ffc27c

Please sign in to comment.