Skip to content

Commit

Permalink
[SPARK-32614][SQL] Don't apply comment processing if 'comment' unset …
Browse files Browse the repository at this point in the history
…for CSV

### What changes were proposed in this pull request?

Spark's CSV source can optionally ignore lines starting with a comment char. Some code paths check to see if it's set before applying comment logic (i.e. not set to default of `\0`), but many do not, including the one that passes the option to Univocity. This means that rows beginning with a null char were being treated as comments even when 'disabled'.

### Why are the changes needed?

To avoid dropping rows that start with a null char when this is not requested or intended. See JIRA for an example.

### Does this PR introduce _any_ user-facing change?

Nothing beyond the effect of the bug fix.

### How was this patch tested?

Existing tests plus new test case.

Closes apache#29516 from srowen/SPARK-32614.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
srowen authored and HyukjinKwon committed Aug 25, 2020
1 parent 2feab4e commit a9d4e60
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-1.2
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ stream/2.9.6//stream-2.9.6.jar
stringtemplate/3.2.1//stringtemplate-3.2.1.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
xml-apis/1.4.01//xml-apis-1.4.01.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ stream/2.9.6//stream-2.9.6.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
token-provider/1.0.1//token-provider-1.0.1.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
woodstox-core/5.0.3//woodstox-core-5.0.3.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2348,7 +2348,7 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.8.3</version>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ object CSVExprUtils {
* This is currently being used in CSV reading path and CSV schema inference.
*/
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(options.comment.toString)
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(commentPrefix)
}
} else {
iter.filter(_.trim.nonEmpty)
}
}

def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
line.trim.isEmpty || line.startsWith(commentPrefix)
}
} else {
iter.dropWhile(_.trim.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ class CSVOptions(
format.setQuote(quote)
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
if (isCommentSet) {
format.setComment(comment)
}
lineSeparatorInWrite.foreach(format.setLineSeparator)

writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
Expand All @@ -242,7 +244,11 @@ class CSVOptions(
format.setQuoteEscape(escape)
lineSeparator.foreach(format.setLineSeparator)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
if (isCommentSet) {
format.setComment(comment)
} else {
settings.setCommentProcessingEnabled(false)
}

settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1902,25 +1902,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa

test("SPARK-25387: bad input should not cause NPE") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))

checkAnswer(spark.read.schema(schema).csv(input), Row(null))
checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
assert(spark.read.csv(input).collect().toSet == Set(Row()))
assert(spark.read.schema(schema).csv(input).collect().toSet == Set(Row(null)))
}

test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
val schema = StructType(
StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))

checkAnswer(
spark.read
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(schema)
.csv(input),
Row(null, null))
assert(spark.read.csv(input).collect().toSet == Set(Row()))
Row(null, "\u0001\u0000\u0001234"))
assert(spark.read.schema(schema).csv(input).collect().toSet ==
Set(Row(null, "\u0001\u0000\u0001234")))
}

test("field names of inferred schema shouldn't compare to the first row") {
Expand Down Expand Up @@ -2366,6 +2367,17 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
}
}

test("SPARK-32614: don't treat rows starting with null char as comment") {
withTempPath { path =>
Seq("\u0000foo", "bar", "baz").toDS.write.text(path.getCanonicalPath)
val df = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "true")
.load(path.getCanonicalPath)
assert(df.count() == 3)
}
}

test("case sensitivity of filters references") {
Seq(true, false).foreach { filterPushdown =>
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) {
Expand Down

0 comments on commit a9d4e60

Please sign in to comment.