Skip to content

Commit

Permalink
TEZ-2857. Fix flakey tests in TestDAGImpl. (sseth)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidseth committed Sep 29, 2015
1 parent 8b412ee commit d63d6ee
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-2857. Fix flakey tests in TestDAGImpl.
TEZ-2836. Avoid setting framework/system counters for tasks running in threads.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2833. Dont create extra directory during ATS file download
Expand Down Expand Up @@ -191,6 +192,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES
TEZ-2857. Fix flakey tests in TestDAGImpl.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2808. Race condition between preemption and container assignment
TEZ-2853. Tez UI: task attempt page is coming empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ public void testDAGInitFailedDuetoInvalidResource() {
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
Assert.assertTrue(StringUtils.join(dag.getDiagnostics(),",")
Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), ",")
.contains("Vertex's TaskResource is beyond the cluster container capability"));
}

Expand Down Expand Up @@ -1093,7 +1093,7 @@ public void testVertexCompletion() {
Assert.assertEquals(1, dag.getSuccessfulVertices());

// 2 tasks completed, total plan has 11 vertices
Assert.assertEquals((float)2/11,
Assert.assertEquals((float) 2 / 11,
dag.getCompletedTaskProgress(), 0.05);
}

Expand Down Expand Up @@ -1168,7 +1168,7 @@ public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting
setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
Expand All @@ -1183,7 +1183,8 @@ public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting
DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
TezEvent tezEvent = new TezEvent(daEvent,
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.getEventHandler().handle(
new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();

Assert.assertEquals(VertexState.FAILED, v2.getState());
Expand Down Expand Up @@ -1230,7 +1231,7 @@ public void testEdgeManager_GetNumDestinationConsumerTasks() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
Expand All @@ -1245,7 +1246,8 @@ public void testEdgeManager_GetNumDestinationConsumerTasks() {
InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
TezEvent tezEvent = new TezEvent(ireEvent,
new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.getEventHandler().handle(
new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
//
Assert.assertEquals(VertexState.FAILED, v2.getState());
Expand Down Expand Up @@ -1318,12 +1320,14 @@ public void testGroupDAGWithVertexReRunning() {
Vertex v2 = groupDag.getVertex("vertex2");
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(
new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit should not happen due to vertex-rerunning
Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);

dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(
new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit happen
Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
Expand Down Expand Up @@ -1417,10 +1421,10 @@ public void testDAGCompletionWithCommitFailure() throws IOException {
.newBuilder()
.setClassName(CountingOutputCommitter.class.getName())
.setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
.setUserPayload(
ByteString
.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
true, false, false).toUserPayload())).build()))
.setUserPayload(
ByteString
.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
true, false, false).toUserPayload())).build()))
.setName("output3")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class")
Expand Down Expand Up @@ -1848,13 +1852,8 @@ public void testVertexFailureHandling() {
}
}

// a dag.kill() on an active DAG races with vertices all succeeding.
// if a JOB_KILL is processed while dag is in running state, it should end in KILLED,
// regardless of whether all vertices complete
//
// Final state:
// DAG is in KILLED state, with killTrigger = USER_KILL
// Each vertex had kill triggered but raced ahead and ends in SUCCEEDED state.
// Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be
// marked as KILLED.
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKill() {
Expand All @@ -1870,22 +1869,70 @@ public void testDAGKill() {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertEquals(2, dag.getSuccessfulVertices());

int killedCount = 0;
for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
if (vEntry.getValue().getState() == VertexState.KILLED) {
killedCount++;
}
}
Assert.assertEquals(4, killedCount);

for (Vertex v : dag.getVertices().values()) {
Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
}

Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}

// Vertices succeed after a DAG kill has been processed. Should be ignored.
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKillVertexSuccessAfterKill() {
initDAG(dag);
startDAG(dag);
dispatcher.await();

dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());

dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
dispatcher.await();

Assert.assertEquals(DAGState.KILLED, dag.getState());

// Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored.
for (int i = 2; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());

int killedCount = 0;
for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
if (vEntry.getValue().getState() == VertexState.KILLED) {
killedCount++;
}
}
Assert.assertEquals(4, killedCount);

Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertEquals(6, dag.getSuccessfulVertices());
Assert.assertEquals(2, dag.getSuccessfulVertices());
for (Vertex v : dag.getVertices().values()) {
Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}

// job kill races with most vertices succeeding and one directly killed.
// because the job.kill() happens before the direct kill, the vertex has kill_trigger=DAG_KILL
// Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKillPending() {
Expand All @@ -1900,21 +1947,23 @@ public void testDAGKillPending() {

dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));

for (int i = 2; i < 5; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());

dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(), VertexTerminationCause.DAG_KILL);
Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
VertexTerminationCause.DAG_KILL);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}

Expand Down

0 comments on commit d63d6ee

Please sign in to comment.