Skip to content

Commit

Permalink
[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs
Browse files Browse the repository at this point in the history
This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis.

Author: Michael Armbrust <[email protected]>

Closes apache#354 from marmbrus/insertIntoTable and squashes the following commits:

6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests.
f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable
765c506 [Michael Armbrust] Add to JavaAPI.
77b512c [Michael Armbrust] typos.
5c3ef95 [Michael Armbrust] use names for boolean args.
882afdf [Michael Armbrust] Change createTableAs to saveAsTable.  Clean up api annotations.
d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables.
fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well.  Add createTableAs function.
  • Loading branch information
marmbrus authored and rxin committed Apr 16, 2014
1 parent 63ca581 commit 273c2fd
Show file tree
Hide file tree
Showing 16 changed files with 535 additions and 160 deletions.
14 changes: 10 additions & 4 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ def parquetFile(self, path):
"""
Loads a Parquet file, returning the result as a L{SchemaRDD}.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile("/tmp/tmp.parquet")
>>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet")
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> srdd.collect() == srdd2.collect()
True
"""
Expand Down Expand Up @@ -278,9 +281,12 @@ def saveAsParquetFile(self, path):
that are written out using this method can be read back in as a SchemaRDD using the
L{SQLContext.parquetFile} method.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile("/tmp/test.parquet")
>>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet")
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> srdd2.collect() == srdd.collect()
True
"""
Expand Down
57 changes: 50 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD

import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl
import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies

import org.apache.spark.sql.parquet.ParquetRelation

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new this.QueryExecution { val logical = plan }

/**
* :: Experimental ::
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guranteed to be stable. As a result, using
* them directly is not reccomended.
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
* them directly is not recommended.
*/
@Experimental
@DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)

/**
Expand All @@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* val sqlContext = new SQLContext(...)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @tparam A A case class type that describes the desired schema of the parquet file to be
* created.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
* output format.
*
* @group userf
*/
@Experimental
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
}

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
Expand Down Expand Up @@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected abstract class QueryExecution {
def logical: LogicalPlan

Expand All @@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
Expand Down
28 changes: 5 additions & 23 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import net.razorvine.pickle.Pickler

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -83,8 +83,6 @@ import java.util.{Map => JMap}
* rdd.where('key === 1).orderBy('value.asc).select('key).collect()
* }}}
*
* @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
*
* @groupname Query Language Integrated Queries
* @groupdesc Query Functions that create new queries from SchemaRDDs. The
* result of all query functions is also a SchemaRDD, allowing multiple operations to be
Expand Down Expand Up @@ -276,8 +274,8 @@ class SchemaRDD(
* an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a
* given row, a single row will be output, with `NULL` values for each of the
* generated columns.
* @param alias an optional alias that can be used as qualif for the attributes that are produced
* by this generate operation.
* @param alias an optional alias that can be used as qualifier for the attributes that are
* produced by this generate operation.
*
* @group Query
*/
Expand All @@ -290,29 +288,13 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
* no notion of persistent tables, and thus queries that contain this operator will fail to
* optimize. When working with an extension of a SQLContext that has a persistent catalog, such
* as a `HiveContext`, this operation will result in insertions to the table specified.
* Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit
* conversion from a standard RDD to a SchemaRDD.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean = false) =
new SchemaRDD(
sqlContext,
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))

/**
* Returns this RDD as a SchemaRDD.
* @group schema
*/
def toSchemaRDD = this

/** FOR INTERNAL USE ONLY */
def analyze = sqlContext.analyzer(logicalPlan)

private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter =>
Expand Down
59 changes: 54 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand All @@ -29,14 +31,24 @@ trait SchemaRDDLike {
private[sql] def baseSchemaRDD: SchemaRDD

/**
* :: DeveloperApi ::
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
* through to the RDD that is produced by this workflow. This workflow is produced lazily because
* invoking the whole query optimization pipeline can be expensive.
*
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
* The query execution is considered a Developer API as phases may be added or removed in future
* releases. This execution is only exposed to provide an interface for inspecting the various
* phases for debugging purposes. Applications should not depend on particular phases existing
* or producing any specific output, even for exactly the same query.
*
* Additionally, the RDD exposed by this execution is not designed for consumption by end users.
* In particular, it does not contain any schema information, and it reuses Row objects
* internally. This object reuse improves performance, but can make programming against the RDD
* more difficult. Instead end users should perform RDD operations on a SchemaRDD directly.
*/
@transient
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)

override def toString =
s"""${super.toString}
Expand All @@ -45,7 +57,8 @@ trait SchemaRDDLike {

/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the ``function
* are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
* function.
*
* @group schema
*/
Expand All @@ -62,4 +75,40 @@ trait SchemaRDDLike {
def registerAsTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
}

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit =
sqlContext.executePlan(
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd

/**
* :: Experimental ::
* Appends the rows from this RDD to the specified table.
*
* @group schema
*/
@Experimental
def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)

/**
* :: Experimental ::
* Creates a table from the the contents of this SchemaRDD. This will fail if the table already
* exists.
*
* Note that this currently only works with SchemaRDDs that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* @group schema
*/
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.api.java

import java.beans.{Introspector, PropertyDescriptor}
import java.beans.Introspector

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand All @@ -45,29 +48,42 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
result
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
*
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
* sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specific options to the parquet
* output format.
*/
@Experimental
def createParquetFile(
beanClass: Class[_],
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): JavaSchemaRDD = {
new JavaSchemaRDD(
sqlContext,
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
}

/**
* Applies a schema to an RDD of Java Beans.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
val schema = fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}

AttributeReference(property.getName, dataType, true)()
}

val schema = getSchema(beanClass)
val className = beanClass.getCanonicalName
val rowRdd = rdd.rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
Expand Down Expand Up @@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}

/** Returns a Catalyst Schema for the given java bean class. */
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val dataType = property.getPropertyType match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}
// TODO: Nullability could be stricter.
AttributeReference(property.getName, dataType, nullable = true)()
}
}
}
Loading

0 comments on commit 273c2fd

Please sign in to comment.