Skip to content

Commit

Permalink
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks …
Browse files Browse the repository at this point in the history
…(jlowe)
  • Loading branch information
jlowe committed Apr 28, 2016
1 parent f8e0148 commit 7221d38
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
Expand All @@ -25,6 +26,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
TEZ-3226. Tez UI 2: All DAGs UX improvements.
Expand Down Expand Up @@ -462,6 +464,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.

ALL CHANGES:
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3224. User payload is not initialized before creating vertex manager plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs
TEZ-3202. Reduce the memory need for jobs with high number of segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,6 @@ void schedulePendingTasks() {
// vertex not started yet
return;
}
int numPendingTasks = pendingTasks.size();
if (numPendingTasks == 0) {
return;
}

if (!sourceVerticesScheduled && !canScheduleTasks()) {
if (LOG.isDebugEnabled()) {
Expand All @@ -922,7 +918,8 @@ void schedulePendingTasks() {
return;
}

if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks && numPendingTasks > 0) {
int numPendingTasks = pendingTasks.size();
if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,58 @@ public Object answer(InvocationOnMock invocation) {
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
}

@Test
public void testZeroTasksSendsConfigured() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
true);
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L);
ShuffleVertexManager manager = null;

HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
String r1 = "R1";
EdgeProperty eProp1 = EdgeProperty.create(
EdgeProperty.DataMovementType.SCATTER_GATHER,
EdgeProperty.DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));

final String mockManagedVertexId = "R2";
mockInputVertices.put(r1, eProp1);

final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);

VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
// check initialization
manager = createManager(conf, mockContext, 0.001f, 0.001f);

final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
}}).when(mockContext).scheduleTasks(anyList());

manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
Assert.assertEquals(1, manager.bipartiteSources);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(0, scheduledTasks.size());
verify(mockContext).doneReconfiguringVertex();
}

public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);
Expand Down

0 comments on commit 7221d38

Please sign in to comment.