Skip to content

Commit

Permalink
[SPARK-32743][SQL] Add distinct info at UnresolvedFunction toString
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add distinct info at `UnresolvedFunction.toString`.

### Why are the changes needed?

Make `UnresolvedFunction` info complete.

```
create table test (c1 int, c2 int);
explain extended select sum(distinct c1) from test;

-- before this pr
== Parsed Logical Plan ==
'Project [unresolvedalias('sum('c1), None)]
+- 'UnresolvedRelation [test]

-- after this pr
== Parsed Logical Plan ==
'Project [unresolvedalias('sum(distinct 'c1), None)]
+- 'UnresolvedRelation [test]
```

### Does this PR introduce _any_ user-facing change?

Yes, get distinct info during sql parse.

### How was this patch tested?

manual test.

Closes apache#29586 from ulysses-you/SPARK-32743.

Authored-by: ulysses <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
  • Loading branch information
ulysses-you authored and maropu committed Oct 9, 2020
1 parent c5f6af9 commit a907729
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ case class UnresolvedFunction(
override lazy val resolved = false

override def prettyName: String = name.unquotedString
override def toString: String = s"'$name(${children.mkString(", ")})"
override def toString: String = {
val distinct = if (isDistinct) "distinct " else ""
s"'$name($distinct${children.mkString(", ")})"
}
}

object UnresolvedFunction {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--IMPORT explain.sql

--SET spark.sql.adaptive.enabled=true
--SET spark.sql.maxMetadataStringLength = 500
6 changes: 6 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
--SET spark.sql.codegen.wholeStage = true
--SET spark.sql.adaptive.enabled = false
--SET spark.sql.maxMetadataStringLength = 500

-- Test tables
CREATE table explain_temp1 (key int, val int) USING PARQUET;
Expand All @@ -9,6 +10,11 @@ CREATE table explain_temp4 (key int, val string) USING PARQUET;

SET spark.sql.codegen.wholeStage = true;

-- distinct func
EXPLAIN EXTENDED
SELECT sum(distinct val)
FROM explain_temp1;

-- single table
EXPLAIN FORMATTED
SELECT key, max(val)
Expand Down
33 changes: 33 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,39 @@ struct<key:string,value:string>
spark.sql.codegen.wholeStage true


-- !query
EXPLAIN EXTENDED
SELECT sum(distinct val)
FROM explain_temp1
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'Project [unresolvedalias('sum(distinct 'val), None)]
+- 'UnresolvedRelation [explain_temp1], [], false

== Analyzed Logical Plan ==
sum(DISTINCT val): bigint
Aggregate [sum(distinct cast(val#x as bigint)) AS sum(DISTINCT val)#xL]
+- SubqueryAlias spark_catalog.default.explain_temp1
+- Relation[key#x,val#x] parquet

== Optimized Logical Plan ==
Aggregate [sum(distinct cast(val#x as bigint)) AS sum(DISTINCT val)#xL]
+- Project [val#x]
+- Relation[key#x,val#x] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(distinct cast(val#x as bigint)#xL)], output=[sum(DISTINCT val)#xL])
+- Exchange SinglePartition, true, [id=#x]
+- HashAggregate(keys=[], functions=[partial_sum(distinct cast(val#x as bigint)#xL)], output=[sum#xL])
+- HashAggregate(keys=[cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
+- Exchange hashpartitioning(cast(val#x as bigint)#xL, 4), true, [id=#x]
+- HashAggregate(keys=[cast(val#x as bigint) AS cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
+- FileScan parquet default.explain_temp1[val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<val:int>


-- !query
EXPLAIN FORMATTED
SELECT key, max(val)
Expand Down
33 changes: 33 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/explain.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,39 @@ struct<key:string,value:string>
spark.sql.codegen.wholeStage true


-- !query
EXPLAIN EXTENDED
SELECT sum(distinct val)
FROM explain_temp1
-- !query schema
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
'Project [unresolvedalias('sum(distinct 'val), None)]
+- 'UnresolvedRelation [explain_temp1], [], false

== Analyzed Logical Plan ==
sum(DISTINCT val): bigint
Aggregate [sum(distinct cast(val#x as bigint)) AS sum(DISTINCT val)#xL]
+- SubqueryAlias spark_catalog.default.explain_temp1
+- Relation[key#x,val#x] parquet

== Optimized Logical Plan ==
Aggregate [sum(distinct cast(val#x as bigint)) AS sum(DISTINCT val)#xL]
+- Project [val#x]
+- Relation[key#x,val#x] parquet

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(distinct cast(val#x as bigint)#xL)], output=[sum(DISTINCT val)#xL])
+- Exchange SinglePartition, true, [id=#x]
+- *HashAggregate(keys=[], functions=[partial_sum(distinct cast(val#x as bigint)#xL)], output=[sum#xL])
+- *HashAggregate(keys=[cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
+- Exchange hashpartitioning(cast(val#x as bigint)#xL, 4), true, [id=#x]
+- *HashAggregate(keys=[cast(val#x as bigint) AS cast(val#x as bigint)#xL], functions=[], output=[cast(val#x as bigint)#xL])
+- *ColumnarToRow
+- FileScan parquet default.explain_temp1[val#x] Batched: true, DataFilters: [], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<val:int>


-- !query
EXPLAIN FORMATTED
SELECT key, max(val)
Expand Down

0 comments on commit a907729

Please sign in to comment.