Skip to content

Commit

Permalink
TEZ-3643. Long running AMs can go out of memory due to retained
Browse files Browse the repository at this point in the history
AMContainer instances. (sseth)
  • Loading branch information
sidseth committed Mar 1, 2017
1 parent ee4a9a9 commit 6051542
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES

ALL CHANGES:

TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances.
TEZ-3637. TezMerger logs too much at INFO level
TEZ-3638. VertexImpl logs too much at info when removing tasks after auto-reduce parallelism
TEZ-3634. reduce the default buffer sizes in PipelinedSorter by a small amount.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
public int getTaskSchedulerIdentifier();
public int getContainerLauncherIdentifier();
public int getTaskCommunicatorIdentifier();
public boolean isInErrorState();
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ public int getTaskCommunicatorIdentifier() {
return this.taskCommId;
}

@Override
public boolean isInErrorState() {
return inError;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.tez.dag.app.rm.container;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.slf4j.Logger;
Expand All @@ -41,7 +44,8 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
private final TaskCommunicatorManagerInterface tal;
private final AppContext context;
private final ContainerSignatureMatcher containerSignatureMatcher;
private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
@VisibleForTesting
final ConcurrentHashMap<ContainerId, AMContainer> containerMap;

public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
Expand All @@ -64,11 +68,23 @@ public void handle(AMContainerEvent event) {
}

public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
AMContainer amc = new AMContainerImpl(container, chh, tal,
containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
AMContainer amc = createAmContainer(container, chh, tal,
containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);

return (containerMap.putIfAbsent(container.getId(), amc) == null);
}

AMContainer createAmContainer(Container container,
ContainerHeartbeatHandler chh,
TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher signatureMatcher,
AppContext appContext, int schedulerId,
int launcherId, int taskCommId) {
AMContainer amc = new AMContainerImpl(container, chh, tal,
signatureMatcher, appContext, schedulerId, launcherId, taskCommId);
return amc;
}

public AMContainer get(ContainerId containerId) {
return containerMap.get(containerId);
}
Expand All @@ -79,6 +95,24 @@ public Collection<AMContainer> values() {

public void dagComplete(DAG dag){
AMContainerHelpers.dagComplete(dag.getID());
// Cleanup completed containers after a query completes.
cleanupCompletedContainers();
}

private void cleanupCompletedContainers() {
Iterator<Map.Entry<ContainerId, AMContainer>> iterator = containerMap.entrySet().iterator();
int count = 0;
while (iterator.hasNext()) {
Map.Entry<ContainerId, AMContainer> entry = iterator.next();
AMContainer amContainer = entry.getValue();
if (AMContainerState.COMPLETED.equals(amContainer.getState()) || amContainer.isInErrorState()) {
iterator.remove();
count++;
}
}
LOG.info(
"Cleaned up completed containers on dagComplete. Removed={}, Remaining={}",
count, containerMap.size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ public void testCredentialsTransfer() {

// TODO Verify diagnostics in most of the tests.

private static class WrappedContainer {
static class WrappedContainer {

long rmIdentifier = 2000;
static final int taskPriority = 10;
Expand Down Expand Up @@ -1215,10 +1215,10 @@ private static class WrappedContainer {
public AMContainerImpl amContainer;

@SuppressWarnings("deprecation") // ContainerId
public WrappedContainer(boolean shouldProfile, String profileString) {
public WrappedContainer(boolean shouldProfile, String profileString, int cIdInt) {
applicationID = ApplicationId.newInstance(rmIdentifier, 1);
appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
containerID = ContainerId.newInstance(appAttemptID, 1);
containerID = ContainerId.newInstance(appAttemptID, cIdInt);
nodeID = NodeId.newInstance("host", 12500);
nodeHttpAddress = "host:12501";
resource = Resource.newInstance(1024, 1);
Expand Down Expand Up @@ -1265,7 +1265,7 @@ public WrappedContainer(boolean shouldProfile, String profileString) {
}

public WrappedContainer() {
this(false, null);
this(false, null, 1);
}

protected void mockDAGID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

package org.apache.tez.dag.app.rm.container;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import java.net.InetSocketAddress;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
Expand All @@ -31,43 +35,117 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.container.TestAMContainer.WrappedContainer;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.junit.Test;

public class TestAMContainerMap {

private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
return mock(ContainerHeartbeatHandler.class);
}

private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException {
TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
TaskCommunicator taskComm = mock(TaskCommunicator.class);
doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
doReturn(taskComm).when(tal).getTaskCommunicator(0);
return tal;
}
@Test (timeout = 10000)
public void testCleanupOnDagComplete() {

private AppContext mockAppContext() {
ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
AppContext appContext = mock(AppContext.class);
return appContext;
}

@SuppressWarnings("deprecation")
private ContainerId mockContainerId(int cId) {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
return containerId;


int numContainers = 7;
WrappedContainer[] wContainers = new WrappedContainer[numContainers];
for (int i = 0 ; i < numContainers ; i++) {
WrappedContainer wc =
new WrappedContainer(false, null, i);
wContainers[i] = wc;
}

AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock(
ContainerSignatureMatcher.class), appContext, wContainers);

for (int i = 0 ; i < numContainers ; i++) {
amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0);
}


// Container 1 in LAUNCHING state
wContainers[0].launchContainer();
wContainers[0].verifyState(AMContainerState.LAUNCHING);

// Container 2 in IDLE state
wContainers[1].launchContainer();
wContainers[1].containerLaunched();
wContainers[1].verifyState(AMContainerState.IDLE);

// Container 3 RUNNING state
wContainers[2].launchContainer();
wContainers[2].containerLaunched();
wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID);
wContainers[2].verifyState(AMContainerState.RUNNING);

// Cointainer 4 STOP_REQUESTED
wContainers[3].launchContainer();
wContainers[3].containerLaunched();
wContainers[3].stopRequest();
wContainers[3].verifyState(AMContainerState.STOP_REQUESTED);

// Container 5 STOPPING
wContainers[4].launchContainer();
wContainers[4].containerLaunched();
wContainers[4].stopRequest();
wContainers[4].nmStopSent();
wContainers[4].verifyState(AMContainerState.STOPPING);

// Container 6 COMPLETED
wContainers[5].launchContainer();
wContainers[5].containerLaunched();
wContainers[5].stopRequest();
wContainers[5].nmStopSent();
wContainers[5].containerCompleted();
wContainers[5].verifyState(AMContainerState.COMPLETED);

// Container 7 STOP_REQUESTED + ERROR
wContainers[6].launchContainer();
wContainers[6].containerLaunched();
wContainers[6].containerLaunched();
assertTrue(wContainers[6].amContainer.isInErrorState());
wContainers[6].verifyState(AMContainerState.STOP_REQUESTED);

// 7 containers present, and registered with AMContainerMap at this point.

assertEquals(7, amContainerMap.containerMap.size());
amContainerMap.dagComplete(mock(DAG.class));
assertEquals(5, amContainerMap.containerMap.size());
}

private Container mockContainer(ContainerId containerId) {
NodeId nodeId = NodeId.newInstance("localhost", 43255);
Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
return container;
private static class AMContainerMapForTest extends AMContainerMap {


private WrappedContainer[] wrappedContainers;

public AMContainerMapForTest(ContainerHeartbeatHandler chh,
TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher containerSignatureMatcher,
AppContext context, WrappedContainer[] wrappedContainers) {
super(chh, tal, containerSignatureMatcher, context);
this.wrappedContainers = wrappedContainers;
}

@Override
AMContainer createAmContainer(Container container,
ContainerHeartbeatHandler chh,
TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher signatureMatcher,
AppContext appContext, int schedulerId,
int launcherId, int taskCommId) {
return wrappedContainers[container.getId().getId()].amContainer;
}

}
}

0 comments on commit 6051542

Please sign in to comment.