Skip to content

Commit 8a538c9

Browse files
seyferxin
authored andcommitted
[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe <[email protected]> Closes apache#15706 from seyfe/keyvaluegrouped_serialization.
1 parent 8cdf143 commit 8a538c9

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala

+17
Original file line numberDiff line numberDiff line change
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
473473
assertDoesNotContain("AssertionError", output)
474474
assertDoesNotContain("Exception", output)
475475
}
476+
477+
test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
478+
val resultValue = 12345
479+
val output = runInterpreter("local",
480+
s"""
481+
|val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
482+
|val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
483+
|val broadcasted = sc.broadcast($resultValue)
484+
|
485+
|// Using broadcast triggers serialization issue in KeyValueGroupedDataset
486+
|val dataset = mapGroups.map(_ => broadcasted.value)
487+
|dataset.collect()
488+
""".stripMargin)
489+
assertDoesNotContain("error:", output)
490+
assertDoesNotContain("Exception", output)
491+
assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
492+
}
476493
}

sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
4040
class KeyValueGroupedDataset[K, V] private[sql](
4141
kEncoder: Encoder[K],
4242
vEncoder: Encoder[V],
43-
val queryExecution: QueryExecution,
43+
@transient val queryExecution: QueryExecution,
4444
private val dataAttributes: Seq[Attribute],
4545
private val groupingAttributes: Seq[Attribute]) extends Serializable {
4646

0 commit comments

Comments
 (0)