Skip to content

Commit

Permalink
[SPARK-8657] [YARN] Fail to upload resource to viewfs
Browse files Browse the repository at this point in the history
Fail to upload resource to viewfs in spark-1.4
JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657

Author: Tao Li <[email protected]>

Closes apache#7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits:

65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs

(cherry picked from commit 26d9b6b)
Signed-off-by: Sean Owen <[email protected]>

# Conflicts:
#	yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
  • Loading branch information
litao-buptsse authored and srowen committed Jul 8, 2015
1 parent de49916 commit e91d87e
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,57 @@ private[spark] class Client(
}
}

/**
* Distribute a file to the cluster.
*
* If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
* to HDFS (if not already there) and added to the application's distributed cache.
*
* @param path URI of the file to distribute.
* @param resType Type of resource being distributed.
* @param destName Name of the file in the distributed cache.
* @param targetDir Subdirectory where to place the file.
* @param appMasterOnly Whether to distribute only to the AM.
* @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
* localized path for non-local paths, or the input `path` for local paths.
* The localized path will be null if the URI has already been added to the cache.
*/
def distribute(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
targetDir: Option[String] = None,
appMasterOnly: Boolean = false): (Boolean, String) = {
val localURI = new URI(path.trim())
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
distCacheMgr.addResource(
fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
(false, null)
}
} else {
(true, path.trim())
}
}

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(args.keytab,
destName = Some(sparkConf.get("spark.yarn.keytab")),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}

/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
Expand Down

0 comments on commit e91d87e

Please sign in to comment.