Skip to content

Commit

Permalink
TEZ-3276. Tez Example MRRSleep job fails when tez.staging-dir fs is n…
Browse files Browse the repository at this point in the history
…ot same as default FS. (Harish Jaiprakash via hitesh)
  • Loading branch information
hiteshs committed Jun 1, 2016
1 parent 18da493 commit 6adfb5d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES

ALL CHANGES:

TEZ-3276. Tez Example MRRSleep job fails when tez.staging-dir fs is not same as default FS.
TEZ-3280. LOG MRInputHelpers split generation message as INFO
TEZ-909. Provide support for application tags
TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public static void main(String[] args) throws Exception {

private Credentials credentials = new Credentials();

public DAG createDAG(FileSystem remoteFs, Configuration conf, Path remoteStagingDir,
public DAG createDAG(Configuration conf, Path stagingDir,
int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
Expand Down Expand Up @@ -488,7 +488,7 @@ public DAG createDAG(FileSystem remoteFs, Configuration conf, Path remoteStaging

LOG.info("Writing splits to DFS");
dataSource = MRInputHelpers
.configureMRInputWithLegacySplitGeneration(mapStageConf, remoteStagingDir, true);
.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
} else {
dataSource = MRInputLegacy.createConfigBuilder(mapStageConf, SleepInputFormat.class)
.generateSplitsInAM(generateSplitsInAM).build();
Expand All @@ -500,11 +500,12 @@ public DAG createDAG(FileSystem remoteFs, Configuration conf, Path remoteStaging
throw new TezUncheckedException("Could not find any jar containing"
+ " MRRSleepJob.class in the classpath");
}
Path remoteJarPath = remoteFs.makeQualified(
new Path(remoteStagingDir, "dag_job.jar"));
remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);


FileSystem stagingFs = stagingDir.getFileSystem(conf);
Path remoteJarPath = new Path(stagingDir, "dag_job.jar");
stagingFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
FileStatus jarFileStatus = stagingFs.getFileStatus(remoteJarPath);

TokenCache.obtainTokensForNamenodes(this.credentials, new Path[] { remoteJarPath },
mapStageConf);

Expand Down Expand Up @@ -729,21 +730,18 @@ else if (args[i].equals("-writeSplitsToDfs")) {
iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime));

TezConfiguration conf = new TezConfiguration(getConf());
FileSystem remoteFs = FileSystem.get(conf);

conf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
conf.get(
TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT));

Path remoteStagingDir =
remoteFs.makeQualified(new Path(conf.get(
TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
Long.toString(System.currentTimeMillis())));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);

DAG dag = createDAG(remoteFs, conf, remoteStagingDir,
String stagingBaseDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
Path stagingDir = new Path(stagingBaseDir, Long.toString(System.currentTimeMillis()));
TezClientUtils.ensureStagingDirExists(conf, stagingDir);

DAG dag = createDAG(conf, stagingDir,
numMapper, numReducer, iReduceStagesCount, numIReducer,
mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
Expand Down

0 comments on commit 6adfb5d

Please sign in to comment.