Skip to content

Commit

Permalink
[SPARK-6331] Load new master URL if present when recovering streaming…
Browse files Browse the repository at this point in the history
… context from checkpoint

In streaming driver recovery, when the SparkConf is reconstructed based on the checkpointed configuration, it recovers the old master URL. This okay if the cluster on which the streaming application is relaunched is the same cluster as it was running before. But if that cluster changes, there is no way to inject the new master URL of the new cluster. As a result, the restarted app tries to connect to the non-existent old cluster and fails.

The solution is to check whether a master URL is set in the System properties (by Spark submit) before recreating the SparkConf. If a new master url is set in the properties, then use it as that is obviously the most relevant one. Otherwise load the old one (to maintain existing behavior).

Author: Tathagata Das <[email protected]>

Closes apache#5024 from tdas/SPARK-6331 and squashes the following commits:

392fd44 [Tathagata Das] Fixed naming issue.
c7c0b99 [Tathagata Das] Addressed comments.
6a0857c [Tathagata Das] Updated testsuites.
222485d [Tathagata Das] Load new master URL if present when recovering streaming context from checkpoint
  • Loading branch information
tdas committed Mar 17, 2015
1 parent e26db9b commit c928796
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll

def sparkConf = {
new SparkConf(false).setAll(sparkConfPairs)
def createSparkConf(): SparkConf = {
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")
.remove("spark.driver.port")
val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master")
newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) }
newSparkConf
}

def validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class StreamingContext private[streaming] (

private[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
new SparkContext(cp_.sparkConf)
new SparkContext(cp_.createSparkConf())
} else {
sc_
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ class CheckpointSuite extends TestSuiteBase {

// This tests whether spark conf persists through checkpoints, and certain
// configs gets scrubbed
test("persistence of conf through checkpoints") {
test("recovery of conf through checkpoints") {
val key = "spark.mykey"
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf

val cp = new Checkpoint(ssc, Time(1000))
val cpConf = cp.sparkConf
val cpConf = cp.createSparkConf()
assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(cpConf.get(key) === value)
Expand All @@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase {
// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))

val newCpConf = newCp.sparkConf
// Verify new SparkConf has all the previous properties
val newCpConf = newCp.createSparkConf()
assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(newCpConf.get(key) === value)
Expand All @@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(restoredConf.get(key) === value)
ssc.stop()

// Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331.
try {
val newMaster = "local[100]"
System.setProperty("spark.master", newMaster)
val newCpConf = newCp.createSparkConf()
assert(newCpConf.get("spark.master") === newMaster)
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
ssc = new StreamingContext(null, newCp, null)
assert(ssc.sparkContext.master === newMaster)
} finally {
System.clearProperty("spark.master")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
ssc = new StreamingContext(null, newCp, null)
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
Expand Down

0 comments on commit c928796

Please sign in to comment.