Skip to content

Commit

Permalink
TEZ-2388. Send dag identifier as part of the fetcher request string. …
Browse files Browse the repository at this point in the history
…(sseth)
  • Loading branch information
sidseth committed Aug 22, 2015
1 parent 96e48c5 commit 8ba7e44
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 14 deletions.
1 change: 1 addition & 0 deletions TEZ-2003-CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ ALL CHANGES:
TEZ-2347. Expose additional information in TaskCommunicatorContext.
TEZ-2361. Propagate dag completion to TaskCommunicator.
TEZ-2381. Fixes after rebase 04/28.
TEZ-2388. Send dag identifier as part of the fetcher request string.

INCOMPATIBLE CHANGES:
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
private final int dagIdentifier;

private final String logIdentifier;

Expand Down Expand Up @@ -130,7 +131,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final boolean isDebugEnabled = LOG.isDebugEnabled();

private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
Expand All @@ -144,6 +145,7 @@ private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
this.dagIdentifier = dagIdentifier;
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
Expand Down Expand Up @@ -413,7 +415,7 @@ protected HostFetchResult doHttpFetch() {
private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
httpConnectionParams.isKeepAlive());

Expand Down Expand Up @@ -926,22 +928,22 @@ public static class FetcherBuilder {

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort, asyncHttp);
}

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,21 @@ public static void ioCleanup(Closeable... closeables) {

// TODO NEWTEZ handle ssl shuffle
public static StringBuilder constructBaseURIForShuffleHandler(String host,
int port, int partition, String appId, boolean sslShuffle) {
int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
partition, appId, sslShuffle);
partition, appId, dagIdentifier, sslShuffle);
}

public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
int partition, String appId, boolean sslShuffle) {
int partition, String appId, int dagIdentifier, boolean sslShuffle) {
final String http_protocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(http_protocol);
sb.append(hostIdentifier);
sb.append("/");
sb.append("mapOutput?job=");
sb.append(appId.replace("application", "job"));
sb.append("&dag=");
sb.append(String.valueOf(dagIdentifier));
sb.append("&reduce=");
sb.append(String.valueOf(partition));
sb.append("&map=");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf)
}

FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
httpConnectionParams, inputManager, inputContext.getApplicationId(),
httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
localhostName, shufflePort, asyncHttp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void processTaskFailedEvent(InputFailedEvent ifEvent) {
@VisibleForTesting
URI getBaseURI(String host, int port, int partitionId) {
StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
partitionId, inputContext.getApplicationId().toString(), sslShuffle);
partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle);
URI u = URI.create(sb.toString());
return u;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testLocalFetchModeSetting() throws Exception {
final boolean DISABLE_LOCAL_FETCH = false;

Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testLocalFetchModeSetting() throws Exception {
// When disabled use http fetch
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testSetupLocalDiskFetch() throws Exception {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false);
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());

Expand Down

0 comments on commit 8ba7e44

Please sign in to comment.