Skip to content

Commit

Permalink
[SPARK-32062][SQL] Reset listenerRegistered in SparkSession
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Reset listenerRegistered when application end.

### Why are the changes needed?

Within a jvm, stop and create `SparkContext` multi times will cause the bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add UT.

Closes apache#28899 from ulysses-you/SPARK-32062.

Authored-by: ulysses <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
ulysses-you authored and cloud-fan committed Jun 24, 2020
1 parent 045106e commit 9f540fa
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ object SparkSession extends Logging {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
listenerRegistered.set(false)
}
})
listenerRegistered.set(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,21 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2")
assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
}

test("SPARK-32062: reset listenerRegistered in SparkSession") {
(1 to 2).foreach { i =>
val conf = new SparkConf()
.setMaster("local")
.setAppName(s"test-SPARK-32062-$i")
val context = new SparkContext(conf)
val beforeListenerSize = context.listenerBus.listeners.size()
SparkSession
.builder()
.sparkContext(context)
.getOrCreate()
val afterListenerSize = context.listenerBus.listeners.size()
assert(beforeListenerSize + 1 == afterListenerSize)
context.stop()
}
}
}

0 comments on commit 9f540fa

Please sign in to comment.