From 5bc5fabc68f3972c576cc154ddaa1f15ad61f09d Mon Sep 17 00:00:00 2001
From: Emlyn Corrin
Date: Wed, 6 Jul 2016 15:52:16 -0700
Subject: [PATCH] Use metadata to set column comments and encoding
Fixes #164 and #172.
Author: Emlyn Corrin
Author: Emlyn Corrin
Closes #178 from emlyn/table-comments-from-metadata.
---
README.md | 16 ++++
.../redshift/RedshiftIntegrationSuite.scala | 84 +++++++++++++++++++
.../spark/redshift/Parameters.scala | 5 ++
.../spark/redshift/RedshiftJDBCWrapper.scala | 7 +-
.../spark/redshift/RedshiftWriter.scala | 16 +++-
.../spark/redshift/RedshiftSourceSuite.scala | 35 ++++++++
6 files changed, 161 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 732be979..3dc182e5 100644
--- a/README.md
+++ b/README.md
@@ -335,6 +335,14 @@ data should the COPY fail.
Redshift cluster and/or don't have requirements to keep the table availability high.
+
+ description |
+ No |
+ No default |
+
+ A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools.
+See also the description metadata to set descriptions on individual columns.
+ |
preactions |
No |
@@ -414,6 +422,14 @@ df.write
.save()
```
+### Configuring column encoding
+
+When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings).
+
+### Setting descriptions on columns
+
+Redshift allows columns to have descriptions attached that should show up in most query tools (using the `COMMENT` command). You can set the `description` column metadata field to specify a description for individual columns.
+
## Transactional Guarantees
This section describes the transactional guarantees of the Redshift data source for Spark
diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala
index 9ddfc291..6555a9d2 100644
--- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala
+++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala
@@ -386,6 +386,90 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase {
}
}
+ test("configuring compression on columns") {
+ val tableName = s"configuring_compression_on_columns_$randomSuffix"
+ try {
+ val metadata = new MetadataBuilder().putString("encoding", "LZO").build()
+ val schema = StructType(
+ StructField("x", StringType, metadata = metadata) :: Nil)
+ sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .mode(SaveMode.ErrorIfExists)
+ .save()
+ assert(DefaultJDBCWrapper.tableExists(conn, tableName))
+ val loadedDf = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .load()
+ checkAnswer(loadedDf, Seq(Row("a" * 128)))
+ val encodingDF = sqlContext.read
+ .format("jdbc")
+ .option("url", jdbcUrl)
+ .option("dbtable",
+ s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""")
+ .load()
+ checkAnswer(encodingDF, Seq(Row("x", "lzo")))
+ } finally {
+ conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
+ conn.commit()
+ }
+ }
+
+ test("configuring comments on columns") {
+ val tableName = s"configuring_comments_on_columns_$randomSuffix"
+ try {
+ val metadata = new MetadataBuilder().putString("description", "Hello Column").build()
+ val schema = StructType(
+ StructField("x", StringType, metadata = metadata) :: Nil)
+ sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("description", "Hello Table")
+ .option("tempdir", tempDir)
+ .mode(SaveMode.ErrorIfExists)
+ .save()
+ assert(DefaultJDBCWrapper.tableExists(conn, tableName))
+ val loadedDf = sqlContext.read
+ .format("com.databricks.spark.redshift")
+ .option("url", jdbcUrl)
+ .option("dbtable", tableName)
+ .option("tempdir", tempDir)
+ .load()
+ checkAnswer(loadedDf, Seq(Row("a" * 128)))
+ val tableDF = sqlContext.read
+ .format("jdbc")
+ .option("url", jdbcUrl)
+ .option("dbtable", s"(SELECT pg_catalog.obj_description('$tableName'::regclass))")
+ .load()
+ checkAnswer(tableDF, Seq(Row("Hello Table")))
+ val commentQuery =
+ s"""
+ |(SELECT c.column_name, pgd.description
+ |FROM pg_catalog.pg_statio_all_tables st
+ |INNER JOIN pg_catalog.pg_description pgd
+ | ON (pgd.objoid=st.relid)
+ |INNER JOIN information_schema.columns c
+ | ON (pgd.objsubid=c.ordinal_position AND c.table_name=st.relname)
+ |WHERE c.table_name='$tableName')
+ """.stripMargin
+ val columnDF = sqlContext.read
+ .format("jdbc")
+ .option("url", jdbcUrl)
+ .option("dbtable", commentQuery)
+ .load()
+ checkAnswer(columnDF, Seq(Row("x", "Hello Column")))
+ } finally {
+ conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
+ conn.commit()
+ }
+ }
+
test("informative error message when saving a table with string that is longer than max length") {
val tableName = s"error_message_when_string_too_long_$randomSuffix"
try {
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index d8d2a64f..8a98a65b 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -205,6 +205,11 @@ private[redshift] object Parameters {
*/
def extraCopyOptions: String = parameters.get("extracopyoptions").getOrElse("")
+ /**
+ * Description of the table, set using the SQL COMMENT command.
+ */
+ def description: Option[String] = parameters.get("description")
+
/**
* List of semi-colon separated SQL statements to run before write operations.
* This can be useful for running DELETE operations to clean up data
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala
index cd01c807..7d9bf6b9 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala
@@ -262,7 +262,12 @@ private[redshift] class JDBCWrapper {
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
}
val nullable = if (field.nullable) "" else "NOT NULL"
- sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable""".trim)
+ val encoding = if (field.metadata.contains("encoding")) {
+ s"ENCODE ${field.metadata.getString("encoding")}"
+ } else {
+ ""
+ }
+ sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable $encoding""".trim)
}}
if (sb.length < 2) "" else sb.substring(2)
}
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
index 7947568e..044b6be1 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
@@ -138,6 +138,18 @@ private[redshift] class RedshiftWriter(
}
}
+ /**
+ * Generate COMMENT SQL statements for the table and columns.
+ */
+ private[redshift] def commentActions(tableComment: Option[String], schema: StructType):
+ List[String] = {
+ tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '${desc.replace("'", "''")}'") ++
+ schema.fields
+ .withFilter(f => f.metadata.contains("description"))
+ .map(f => s"""COMMENT ON COLUMN %s."${f.name.replace("\"", "\\\"")}""""
+ + s" IS '${f.metadata.getString("description").replace("'", "''")}'")
+ }
+
/**
* Perform the Redshift load, including deletion of existing data in the case of an overwrite,
* and creating the table if it doesn't already exist.
@@ -161,8 +173,10 @@ private[redshift] class RedshiftWriter(
log.info(createStatement)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(createStatement))
+ val preActions = commentActions(params.description, data.schema) ++ params.preActions
+
// Execute preActions
- params.preActions.foreach { action =>
+ preActions.foreach { action =>
val actionSql = if (action.contains("%s")) action.format(params.table.get) else action
log.info("Executing preAction: " + actionSql)
jdbcWrapper.executeInterruptibly(conn.prepareStatement(actionSql))
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index 9329b68f..47a305d5 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -445,6 +445,41 @@ class RedshiftSourceSuite
assert(createTableCommand === expectedCreateTableCommand)
}
+ test("configuring encoding on columns") {
+ val lzoMetadata = new MetadataBuilder().putString("encoding", "LZO").build()
+ val runlengthMetadata = new MetadataBuilder().putString("encoding", "RUNLENGTH").build()
+ val schema = StructType(
+ StructField("lzo_str", StringType, metadata = lzoMetadata) ::
+ StructField("runlength_str", StringType, metadata = runlengthMetadata) ::
+ StructField("default_str", StringType) ::
+ Nil)
+ val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
+ val createTableCommand =
+ DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim
+ val expectedCreateTableCommand =
+ """CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("lzo_str" TEXT ENCODE LZO,""" +
+ """ "runlength_str" TEXT ENCODE RUNLENGTH, "default_str" TEXT)"""
+ assert(createTableCommand === expectedCreateTableCommand)
+ }
+
+ test("configuring descriptions on columns") {
+ val descriptionMetadata1 = new MetadataBuilder().putString("description", "Test1").build()
+ val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test'2").build()
+ val schema = StructType(
+ StructField("first_str", StringType, metadata = descriptionMetadata1) ::
+ StructField("second_str", StringType, metadata = descriptionMetadata2) ::
+ StructField("default_str", StringType) ::
+ Nil)
+ val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema)
+ val commentCommands =
+ DefaultRedshiftWriter.commentActions(Some("Test"), schema)
+ val expectedCommentCommands = List(
+ "COMMENT ON TABLE %s IS 'Test'",
+ "COMMENT ON COLUMN %s.\"first_str\" IS 'Test1'",
+ "COMMENT ON COLUMN %s.\"second_str\" IS 'Test''2'")
+ assert(commentCommands === expectedCommentCommands)
+ }
+
test("Respect SaveMode.ErrorIfExists when table exists") {
val mockRedshift = new MockRedshift(
defaultParams("url"),