Skip to content

Commit

Permalink
TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via …
Browse files Browse the repository at this point in the history
…hitesh)
  • Loading branch information
hiteshs committed May 2, 2016
1 parent 727584f commit c3b8b85
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 66 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
Expand All @@ -27,6 +28,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
Expand Down Expand Up @@ -465,6 +467,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.

ALL CHANGES:
TEZ-3193. Deadlock in AM during task commit request.
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
Expand Down Expand Up @@ -71,7 +70,6 @@
import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
Expand Down Expand Up @@ -107,7 +105,6 @@
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
Expand Down Expand Up @@ -190,8 +187,9 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
private String nodeHttpAddress;
private String nodeRackName;

private final Task task;
private final Vertex vertex;
private final TaskLocationHint locationHint;
private final TaskSpec taskSpec;

@VisibleForTesting
boolean appendNextDataEvent = true;
Expand Down Expand Up @@ -465,22 +463,25 @@ TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
.installTopology();

@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Task task) {
this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock,
Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
task, null);
vertex, locationHint, taskSpec, null);
}
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,

@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Task task, TezTaskAttemptID schedulingCausalTA) {
Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
TezTaskAttemptID schedulingCausalTA) {

MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
Expand All @@ -496,15 +497,16 @@ public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHa
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
this.attemptId = attemptId;
this.eventHandler = eventHandler;
//Reported status
this.conf = conf;
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
this.task = task;
this.vertex = this.task.getVertex();
this.vertex = vertex;
this.locationHint = locationHint;
this.taskSpec = taskSpec;
this.creationCausalTA = schedulingCausalTA;
this.creationTime = clock.getTime();

Expand Down Expand Up @@ -548,14 +550,6 @@ public TezTaskAttemptID getSchedulingCausalTA() {
return creationCausalTA;
}

TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
TaskSpec baseTaskSpec = task.getBaseTaskSpec();
return new TaskSpec(getID(),
baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
}

@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
Expand Down Expand Up @@ -1036,7 +1030,7 @@ private void sendTaskAttemptCleanupEvent() {
}

private TaskLocationHint getTaskLocationHint() {
return task.getTaskLocationHint();
return locationHint;
}

protected String[] resolveHosts(String[] src) {
Expand Down Expand Up @@ -1226,22 +1220,6 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
ta.scheduledTime = ta.clock.getTime();

// Create the remote task.
TaskSpec remoteTaskSpec;
try {
remoteTaskSpec = ta.createRemoteTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
new TerminateTransition(FAILED_HELPER).transition(ta,
new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag,
TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
// Create startTaskRequest

String[] requestHosts = new String[0];
Expand Down Expand Up @@ -1271,10 +1249,7 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent
locationHint = null;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Asking for container launch with taskAttemptContext: "
+ remoteTaskSpec);
}
LOG.debug("Asking for container launch with taskAttemptContext: {}", ta.taskSpec);

// Send out a launch request to the scheduler.
int priority;
Expand All @@ -1288,7 +1263,7 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent
// TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups.
Vertex vertex = ta.getVertex();
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
ta.attemptId, ta.taskResource, ta.taskSpec, ta, locationHint,
priority, ta.containerContext,
vertex.getTaskSchedulerIdentifier(),
vertex.getContainerLauncherIdentifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
Expand Down Expand Up @@ -718,9 +719,15 @@ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
}

TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
TaskSpec taskSpec = new TaskSpec(attemptId,
baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
return new TaskAttemptImpl(attemptId, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
(failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
(failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
locationHint, taskSpec, schedulingCausalTA);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
Expand Down Expand Up @@ -112,10 +111,10 @@
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.junit.Assert;
Expand All @@ -139,7 +138,6 @@ public FileStatus getFileStatus(Path f) throws IOException {
}

AppContext appCtx;
Task mockTask;
TaskLocationHint locationHint;
Vertex mockVertex;
ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
Expand All @@ -156,9 +154,7 @@ public void setupTest() {
when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
TezConstants.getTezYarnServicePluginName());

mockTask = mock(Task.class);
mockVertex = mock(Vertex.class);
when(mockTask.getVertex()).thenReturn(mockVertex);
when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);

HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
Expand Down Expand Up @@ -196,7 +192,6 @@ public void testLocalityRequest() {
+ AMSchedulerEventTALaunchRequest.class.getName());
}

verify(mockTask, times(1)).getTaskLocationHint();
// TODO Move the Rack request check to the client after TEZ-125 is fixed.
Set<String> requestedRacks = taImpl.taskRacks;
assertEquals(1, requestedRacks.size());
Expand Down Expand Up @@ -1742,12 +1737,12 @@ public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex) {
super(taskId, attemptNumber, eventHandler, tal, conf,
super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
isRescheduled, resource, containerContext, leafVertex, mockTask, null);
when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
isRescheduled, resource, containerContext, leafVertex, mockVertex,
locationHint, null, null);
}


boolean inputFailedReported = false;

Expand All @@ -1756,12 +1751,6 @@ protected Vertex getVertex() {
return mockVertex;
}

@Override
protected TaskSpec createRemoteTaskSpec() {
// FIXME
return null;
}

@Override
protected void logJobHistoryAttemptStarted() {
taskAttemptStartedEventLogged++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
Expand Down Expand Up @@ -948,8 +949,9 @@ public MockTaskImpl(TezVertexID vertexId, int partition,

@Override
protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
attemptNumber, eventHandler, taskCommunicatorManagerInterface,
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
eventHandler, taskCommunicatorManagerInterface,
conf, clock, taskHeartbeatHandler, appContext,
true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
Expand Down Expand Up @@ -995,13 +997,14 @@ public class MockTaskAttemptImpl extends TaskAttemptImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;

public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA);
super(attemptId, eventHandler, tal, conf, clock, thh,
appContext, isRescheduled, resource, containerContext, false, null,
locationHint, mockTaskSpec, schedCausalTA);
}

@Override
Expand Down

0 comments on commit c3b8b85

Please sign in to comment.