Skip to content

Commit

Permalink
Fixes issue caused by http in history server config property (linkedi…
Browse files Browse the repository at this point in the history
  • Loading branch information
shkhrgpt authored and akshayrai committed Feb 28, 2017
1 parent 0d668ab commit f6274b1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class SparkRestClient(sparkConf: SparkConf) {

private val historyServerUri: URI = sparkConf.getOption(HISTORY_SERVER_ADDRESS_KEY) match {
case Some(historyServerAddress) =>
val baseUri = new URI(s"http://${historyServerAddress}")
val baseUri: URI =
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
if (historyServerAddress.contains(s"http://")) {
new URI(historyServerAddress)
} else {
new URI(s"http://${historyServerAddress}")
}
require(baseUri.getPath == "")
baseUri
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,40 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
assertion
}
}

it("returns the desired data from the Spark REST API for cluster mode application when http in jobhistory address") {
import ExecutionContext.Implicits.global
val fakeJerseyServer = new FakeJerseyServer() {
override def configure(): Application = super.configure() match {
case resourceConfig: ResourceConfig =>
resourceConfig
.register(classOf[FetchClusterModeDataFixtures.ApiResource])
.register(classOf[FetchClusterModeDataFixtures.ApplicationResource])
.register(classOf[FetchClusterModeDataFixtures.JobsResource])
.register(classOf[FetchClusterModeDataFixtures.StagesResource])
.register(classOf[FetchClusterModeDataFixtures.ExecutorsResource])
case config => config
}
}

fakeJerseyServer.setUp()

val historyServerUri = fakeJerseyServer.target.getUri

val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"http://${historyServerUri.getHost}:${historyServerUri.getPort}")
val sparkRestClient = new SparkRestClient(sparkConf)

sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID)
restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME)
restDerivedData.jobDatas should not be(None)
restDerivedData.stageDatas should not be(None)
restDerivedData.executorSummaries should not be(None)
} andThen { case assertion: Try[Assertion] =>
fakeJerseyServer.tearDown()
assertion
}
}
}
}

Expand Down

0 comments on commit f6274b1

Please sign in to comment.