Skip to content

Commit

Permalink
Python docstring update for sql.py.
Browse files Browse the repository at this point in the history
Mostly related to the following two rules in PEP8 and PEP257:
- Line length < 72 chars.
- First line should be a concise description of the function/class.

Author: Reynold Xin <[email protected]>

Closes apache#869 from rxin/docstring-schemardd and squashes the following commits:

7cf0cbc [Reynold Xin] Updated sql.py for pep8 docstring.
0a4aef9 [Reynold Xin] Merge branch 'master' into docstring-schemardd
6678937 [Reynold Xin] Python docstring update for sql.py.
  • Loading branch information
rxin committed May 25, 2014
1 parent d79c2b2 commit 14f0358
Showing 1 changed file with 63 additions and 61 deletions.
124 changes: 63 additions & 61 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@


class SQLContext:
"""
Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s,
register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files.
"""Main entry point for SparkSQL functionality.
A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as
tables, execute SQL over tables, cache tables, and read parquet files.
"""

def __init__(self, sparkContext, sqlContext = None):
"""
Create a new SQLContext.
"""Create a new SQLContext.
@param sparkContext: The SparkContext to wrap.
Expand Down Expand Up @@ -63,18 +63,20 @@ def __init__(self, sparkContext, sqlContext = None):

@property
def _ssql_ctx(self):
"""
Accessor for the JVM SparkSQL context. Subclasses can override this property to provide
their own JVM Contexts.
"""Accessor for the JVM SparkSQL context.
Subclasses can override this property to provide their own
JVM Contexts.
"""
if not hasattr(self, '_scala_SQLContext'):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext

def inferSchema(self, rdd):
"""
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
determine the fields names and types, and then use that to extract all the dictionaries.
"""Infer and apply a schema to an RDD of L{dict}s.
We peek at the first row of the RDD to determine the fields names
and types, and then use that to extract all the dictionaries.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
Expand All @@ -92,9 +94,10 @@ def inferSchema(self, rdd):
return SchemaRDD(srdd, self)

def registerRDDAsTable(self, rdd, tableName):
"""
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
during the lifetime of this instance of SQLContext.
"""Registers the given RDD as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of
SQLContext.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
Expand All @@ -106,8 +109,7 @@ def registerRDDAsTable(self, rdd, tableName):
raise ValueError("Can only register SchemaRDD as table")

def parquetFile(self, path):
"""
Loads a Parquet file, returning the result as a L{SchemaRDD}.
"""Loads a Parquet file, returning the result as a L{SchemaRDD}.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
Expand All @@ -122,8 +124,7 @@ def parquetFile(self, path):
return SchemaRDD(jschema_rdd, self)

def sql(self, sqlQuery):
"""
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
"""Return a L{SchemaRDD} representing the result of the given query.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
Expand All @@ -135,8 +136,7 @@ def sql(self, sqlQuery):
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)

def table(self, tableName):
"""
Returns the specified table as a L{SchemaRDD}.
"""Returns the specified table as a L{SchemaRDD}.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
Expand All @@ -147,23 +147,19 @@ def table(self, tableName):
return SchemaRDD(self._ssql_ctx.table(tableName), self)

def cacheTable(self, tableName):
"""
Caches the specified table in-memory.
"""
"""Caches the specified table in-memory."""
self._ssql_ctx.cacheTable(tableName)

def uncacheTable(self, tableName):
"""
Removes the specified table from the in-memory cache.
"""
"""Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName)


class HiveContext(SQLContext):
"""
An instance of the Spark SQL execution engine that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL
and HiveQL commands.
"""A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath.
It supports running both SQL and HiveQL commands.
"""

@property
Expand Down Expand Up @@ -193,9 +189,10 @@ def hql(self, hqlQuery):


class LocalHiveContext(HiveContext):
"""
Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
created with data stored in ./metadata. Warehouse data is stored in in ./warehouse.
"""Starts up an instance of hive where metadata is stored locally.
An in-process metadata data is created with data stored in ./metadata.
Warehouse data is stored in in ./warehouse.
>>> import os
>>> hiveCtx = LocalHiveContext(sc)
Expand Down Expand Up @@ -228,8 +225,10 @@ def _get_hive_ctx(self):
# TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples
# are custom classes that must be generated per Schema.
class Row(dict):
"""
An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields.
"""A row in L{SchemaRDD}.
An extended L{dict} that takes a L{dict} in its constructor, and
exposes those items as fields.
>>> r = Row({"hello" : "world", "foo" : "bar"})
>>> r.hello
Expand All @@ -245,13 +244,16 @@ def __init__(self, d):


class SchemaRDD(RDD):
"""
An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD,
not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.
"""An RDD of L{Row} objects that has an associated schema.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on
directly, as it's underlying implementation is a RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can be done.
The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can
utilize the relational query api exposed by SparkSQL.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
implementation is a RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""

def __init__(self, jschema_rdd, sql_ctx):
Expand All @@ -266,8 +268,9 @@ def __init__(self, jschema_rdd, sql_ctx):

@property
def _jrdd(self):
"""
Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the
"""Lazy evaluation of PythonRDD object.
Only done when a user calls methods defined by the
L{pyspark.rdd.RDD} super class (map, filter, etc.).
"""
if not hasattr(self, '_lazy_jrdd'):
Expand All @@ -279,10 +282,10 @@ def _id(self):
return self._jrdd.id()

def saveAsParquetFile(self, path):
"""
Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files
that are written out using this method can be read back in as a SchemaRDD using the
L{SQLContext.parquetFile} method.
"""Save the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as
a SchemaRDD using the L{SQLContext.parquetFile} method.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
Expand All @@ -296,9 +299,10 @@ def saveAsParquetFile(self, path):
self._jschema_rdd.saveAsParquetFile(path)

def registerAsTable(self, name):
"""
Registers this RDD as a temporary table using the given name. The lifetime of this temporary
table is tied to the L{SQLContext} that was used to create this SchemaRDD.
"""Registers this RDD as a temporary table using the given name.
The lifetime of this temporary table is tied to the L{SQLContext}
that was used to create this SchemaRDD.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerAsTable("test")
Expand All @@ -309,24 +313,22 @@ def registerAsTable(self, name):
self._jschema_rdd.registerAsTable(name)

def insertInto(self, tableName, overwrite = False):
"""
Inserts the contents of this SchemaRDD into the specified table,
optionally overwriting any existing data.
"""Inserts the contents of this SchemaRDD into the specified table.
Optionally overwriting any existing data.
"""
self._jschema_rdd.insertInto(tableName, overwrite)

def saveAsTable(self, tableName):
"""
Creates a new table with the contents of this SchemaRDD.
"""
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)

def count(self):
"""
Return the number of elements in this RDD. Unlike the base RDD
implementation of count, this implementation leverages the query
optimizer to compute the count on the SchemaRDD, which supports
features such as filter pushdown.
"""Return the number of elements in this RDD.
Unlike the base RDD implementation of count, this implementation
leverages the query optimizer to compute the count on the SchemaRDD,
which supports features such as filter pushdown.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.count()
Expand Down

0 comments on commit 14f0358

Please sign in to comment.