Skip to content

Commit

Permalink
Use metadata to set column comments and encoding
Browse files Browse the repository at this point in the history
Fixes #164 and #172.

Author: Emlyn Corrin <[email protected]>
Author: Emlyn Corrin <[email protected]>

Closes #178 from emlyn/table-comments-from-metadata.
  • Loading branch information
emlyn authored and JoshRosen committed Jul 6, 2016
1 parent 9ac340e commit 5bc5fab
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 2 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ data should the <tt>COPY</tt> fail.</p>
Redshift cluster and/or don't have requirements to keep the table availability high.</p>
</td>
</tr>
<tr>
<td><tt>description</tt></td>
<td>No</td>
<td>No default</td>
<td>
<p>A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools.
See also the <tt>description</tt> metadata to set descriptions on individual columns.
</tr>
<tr>
<td><tt>preactions</tt></td>
<td>No</td>
Expand Down Expand Up @@ -414,6 +422,14 @@ df.write
.save()
```

### Configuring column encoding

When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings).

### Setting descriptions on columns

Redshift allows columns to have descriptions attached that should show up in most query tools (using the `COMMENT` command). You can set the `description` column metadata field to specify a description for individual columns.

## Transactional Guarantees

This section describes the transactional guarantees of the Redshift data source for Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,90 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
}
}

test("configuring compression on columns") {
val tableName = s"configuring_compression_on_columns_$randomSuffix"
try {
val metadata = new MetadataBuilder().putString("encoding", "LZO").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 128)))
val encodingDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable",
s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""")
.load()
checkAnswer(encodingDF, Seq(Row("x", "lzo")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("configuring comments on columns") {
val tableName = s"configuring_comments_on_columns_$randomSuffix"
try {
val metadata = new MetadataBuilder().putString("description", "Hello Column").build()
val schema = StructType(
StructField("x", StringType, metadata = metadata) :: Nil)
sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("description", "Hello Table")
.option("tempdir", tempDir)
.mode(SaveMode.ErrorIfExists)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.load()
checkAnswer(loadedDf, Seq(Row("a" * 128)))
val tableDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"(SELECT pg_catalog.obj_description('$tableName'::regclass))")
.load()
checkAnswer(tableDF, Seq(Row("Hello Table")))
val commentQuery =
s"""
|(SELECT c.column_name, pgd.description
|FROM pg_catalog.pg_statio_all_tables st
|INNER JOIN pg_catalog.pg_description pgd
| ON (pgd.objoid=st.relid)
|INNER JOIN information_schema.columns c
| ON (pgd.objsubid=c.ordinal_position AND c.table_name=st.relname)
|WHERE c.table_name='$tableName')
""".stripMargin
val columnDF = sqlContext.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", commentQuery)
.load()
checkAnswer(columnDF, Seq(Row("x", "Hello Column")))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ private[redshift] object Parameters {
*/
def extraCopyOptions: String = parameters.get("extracopyoptions").getOrElse("")

/**
* Description of the table, set using the SQL COMMENT command.
*/
def description: Option[String] = parameters.get("description")

/**
* List of semi-colon separated SQL statements to run before write operations.
* This can be useful for running DELETE operations to clean up data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ private[redshift] class JDBCWrapper {
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable""".trim)
val encoding = if (field.metadata.contains("encoding")) {
s"ENCODE ${field.metadata.getString("encoding")}"
} else {
""
}
sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable $encoding""".trim)
}}
if (sb.length < 2) "" else sb.substring(2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ private[redshift] class RedshiftWriter(
}
}

/**
* Generate COMMENT SQL statements for the table and columns.
*/
private[redshift] def commentActions(tableComment: Option[String], schema: StructType):
List[String] = {
tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '${desc.replace("'", "''")}'") ++
schema.fields
.withFilter(f => f.metadata.contains("description"))
.map(f => s"""COMMENT ON COLUMN %s."${f.name.replace("\"", "\\\"")}""""
+ s" IS '${f.metadata.getString("description").replace("'", "''")}'")
}

/**
* Perform the Redshift load, including deletion of existing data in the case of an overwrite,
* and creating the table if it doesn't already exist.
Expand All @@ -161,8 +173,10 @@ private[redshift] class RedshiftWriter(
log.info(createStatement)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(createStatement))

val preActions = commentActions(params.description, data.schema) ++ params.preActions

// Execute preActions
params.preActions.foreach { action =>
preActions.foreach { action =>
val actionSql = if (action.contains("%s")) action.format(params.table.get) else action
log.info("Executing preAction: " + actionSql)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(actionSql))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,41 @@ class RedshiftSourceSuite
assert(createTableCommand === expectedCreateTableCommand)
}

test("configuring encoding on columns") {
val lzoMetadata = new MetadataBuilder().putString("encoding", "LZO").build()
val runlengthMetadata = new MetadataBuilder().putString("encoding", "RUNLENGTH").build()
val schema = StructType(
StructField("lzo_str", StringType, metadata = lzoMetadata) ::
StructField("runlength_str", StringType, metadata = runlengthMetadata) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val createTableCommand =
DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
val expectedCreateTableCommand =
"""CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("lzo_str" TEXT ENCODE LZO,""" +
""" "runlength_str" TEXT ENCODE RUNLENGTH, "default_str" TEXT)"""
assert(createTableCommand === expectedCreateTableCommand)
}

test("configuring descriptions on columns") {
val descriptionMetadata1 = new MetadataBuilder().putString("description", "Test1").build()
val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test'2").build()
val schema = StructType(
StructField("first_str", StringType, metadata = descriptionMetadata1) ::
StructField("second_str", StringType, metadata = descriptionMetadata2) ::
StructField("default_str", StringType) ::
Nil)
val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
val commentCommands =
DefaultRedshiftWriter.commentActions(Some("Test"), schema)
val expectedCommentCommands = List(
"COMMENT ON TABLE %s IS 'Test'",
"COMMENT ON COLUMN %s.\"first_str\" IS 'Test1'",
"COMMENT ON COLUMN %s.\"second_str\" IS 'Test''2'")
assert(commentCommands === expectedCommentCommands)
}

test("Respect SaveMode.ErrorIfExists when table exists") {
val mockRedshift = new MockRedshift(
defaultParams("url"),
Expand Down

0 comments on commit 5bc5fab

Please sign in to comment.