Skip to content

Commit

Permalink
TEZ-3108. Add support for external services to local mode. (sseth)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidseth committed Mar 30, 2016
1 parent c547dcc commit 9c1d8ce
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.

ALL CHANGES:
TEZ-3108. Add support for external services to local mode.
TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster.
TEZ-2967. Vertex start time should be that of first task start time in UI
TEZ-3175. Add tez client submit host
Expand Down
9 changes: 9 additions & 0 deletions docs/src/site/markdown/localmode.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,12 @@ Potential pitfalls when moving from Local Mode to a real cluster
- The ObjectRegistry will work within a single task, when running in
Local Mode. The behaviour would be different on a real cluster,
where it would work across tasks which share the same container.
Local Mode with External Services
- When running in local mode, regular container execution is converted
to run within the same process, instead of launching containers.
- Execution that is configured to run in external services is unaffected,
and an attempt is made to make use of these external services for execution.
If configured in this manner, make sure that the external services are running
when attempting to use local mode.
43 changes: 38 additions & 5 deletions tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.tez.client;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
Expand All @@ -44,9 +46,12 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClientHandler;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
Expand Down Expand Up @@ -333,16 +338,44 @@ public void run() {

return thread;
}

// this can be overridden by test code to create a mock app
@VisibleForTesting
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir,
String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) {
ContainerId cId, String currentHost, int nmPort,
int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession,
String userDir,
String[] localDirs, String[] logDirs,
Credentials credentials, String jobUserName) throws
IOException {

// Read in additional information about external services
AMPluginDescriptorProto amPluginDescriptorProto =
getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());


return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), 1, credentials, jobUserName, null);
versionInfo.getVersion(), 1, credentials, jobUserName, amPluginDescriptorProto);
}

private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
String applicationIdString) throws
IOException {
Path tezSysStagingPath = TezCommonUtils
.getTezSystemStagingPath(conf, applicationIdString);
// Remove the filesystem qualifier.
String unqualifiedPath = tezSysStagingPath.toUri().getPath();

DAGProtos.ConfigurationProto confProto =
TezUtilsInternal
.readUserSpecifiedTezConfiguration(unqualifiedPath);
AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
}
return amPluginDescriptorProto;
}

}
11 changes: 8 additions & 3 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ public synchronized void serviceInit(final Configuration conf) throws Exception
isLocal, defaultPayload);



LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
Expand Down Expand Up @@ -2638,8 +2637,14 @@ static void processSchedulerDescriptors(List<NamedEntityDescriptor> descriptors,
UserPayload defaultPayload,
BiMap<String, Integer> schedulerPluginMap) {
if (isLocal) {
Preconditions.checkState(descriptors.size() == 1 &&
descriptors.get(0).getEntityName().equals(TezConstants.getTezUberServicePluginName()));
boolean foundUberServiceName = false;
for (NamedEntityDescriptor descriptor : descriptors) {
if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
foundUberServiceName = true;
break;
}
}
Preconditions.checkState(foundUberServiceName);
} else {
boolean foundYarn = false;
for (int i = 0; i < descriptors.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ContainerLauncherManager(AppContext context,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isPureLocalMode) throws TezException {
boolean isLocalMode) throws TezException {
super(ContainerLauncherManager.class.getName());

this.isIncompleteCtor = false;
Expand All @@ -88,7 +88,7 @@ public ContainerLauncherManager(AppContext context,
new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface, userPayload, i);
containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context,
containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode));
containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isLocalMode));
containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher());
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ ContainerLauncher createUberContainerLauncher(ContainerLauncherContext container
AppContext context,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
boolean isPureLocalMode) {
boolean isLocalMode) {
LOG.info("Creating LocalContainerLauncher");
// TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
// extensive internals which are only available at runtime. Will likely require
Expand All @@ -154,7 +154,7 @@ ContainerLauncher createUberContainerLauncher(ContainerLauncherContext container
return
new LocalContainerLauncher(containerLauncherContext, context,
taskCommunicatorManagerInterface,
workingDirectory, isPureLocalMode);
workingDirectory, isLocalMode);
} catch (UnknownHostException e) {
throw new TezUncheckedException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
private final Map<String, String> localEnv;
private final ExecutionContext executionContext;
private final int numExecutors;
private final boolean isPureLocalMode;
private final boolean isLocalMode;

private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
runningContainers =
Expand All @@ -108,7 +108,7 @@ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
AppContext context,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
String workingDirectory,
boolean isPureLocalMode) throws UnknownHostException {
boolean isLocalMode) throws UnknownHostException {
// TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM
// starts up. It's not possible to set these up via a static payload.
// Will need some kind of mechanism to dynamically crate payloads / bind to parameters
Expand All @@ -117,8 +117,8 @@ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
this.context = context;
this.tal = taskCommunicatorManagerInterface;
this.workingDirectory = workingDirectory;
this.isPureLocalMode = isPureLocalMode;
if (isPureLocalMode) {
this.isLocalMode = isLocalMode;
if (isLocalMode) {
localEnv = Maps.newHashMap();
AuxiliaryServiceHelper.setServiceDataIntoEnv(
ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
Expand All @@ -127,7 +127,7 @@ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
}

// Check if the hostname is set in the environment before overriding it.
String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
String host = isLocalMode ? InetAddress.getLocalHost().getHostName() :
System.getenv(Environment.NM_HOST.name());
executionContext = new ExecutionContextImpl(host);

Expand Down Expand Up @@ -347,7 +347,7 @@ private TezChild createTezChild(Configuration defaultConf, ContainerId container
Map<String, String> containerEnv = new HashMap<String, String>();
containerEnv.putAll(localEnv);
// Use the user from env if it's available.
String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
String user = isLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
containerEnv.put(Environment.USER.name(), user);

long memAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class TaskSchedulerManager extends AbstractService implements
@VisibleForTesting
final ExecutorService appCallbackExecutor;

private final boolean isPureLocalMode;
private final boolean isLocalMode;
// If running in non local-only mode, the YARN task scheduler will always run to take care of
// registration with YARN and heartbeats to YARN.
// Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
Expand Down Expand Up @@ -159,7 +159,7 @@ public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext,
this.taskSchedulerDescriptors = null;
this.webUI = null;
this.historyUrl = null;
this.isPureLocalMode = false;
this.isLocalMode = false;
}

/**
Expand All @@ -171,15 +171,15 @@ public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext,
* @param webUI
* @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
* An empty list defaults to using the YarnTaskScheduler as the only source.
* @param isPureLocalMode whether the AM is running in local mode
* @param isLocalMode whether the AM is running in local mode
*/
@SuppressWarnings("rawtypes")
public TaskSchedulerManager(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher,
WebUIService webUI,
List<NamedEntityDescriptor> schedulerDescriptors,
boolean isPureLocalMode) {
boolean isLocalMode) {
super(TaskSchedulerManager.class.getName());
Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
"TaskSchedulerDescriptors must be specified");
Expand All @@ -189,7 +189,7 @@ public TaskSchedulerManager(AppContext appContext,
this.containerSignatureMatcher = containerSignatureMatcher;
this.webUI = webUI;
this.historyUrl = getHistoryUrl();
this.isPureLocalMode = isPureLocalMode;
this.isLocalMode = isLocalMode;
this.appCallbackExecutor = createAppCallbackExecutorService();
if (this.webUI != null) {
this.webUI.setHistoryUrl(this.historyUrl);
Expand Down Expand Up @@ -579,8 +579,11 @@ protected void instantiateSchedulers(String host, int port, String trackingUrl,
int j = 0;
for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
long customAppIdIdentifier;
if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
if ((isLocalMode && taskSchedulerDescriptors[i].getEntityName()
.equals(TezConstants.getTezUberServicePluginName()) ||
taskSchedulerDescriptors[i].getEntityName()
.equals(TezConstants.getTezYarnServicePluginName()))) {
// Use the provided appId instead of constructing one for containers.
customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
Expand Down
Loading

0 comments on commit 9c1d8ce

Please sign in to comment.