Skip to content

Commit

Permalink
YARN-3839. Quit throwing NMNotYetReadyException. Contributed by Manik…
Browse files Browse the repository at this point in the history
…andan R
  • Loading branch information
jlowe committed May 8, 2017
1 parent cef2815 commit 424887e
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
Expand All @@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;

/**
Expand Down Expand Up @@ -101,9 +100,6 @@ public interface ContainerManagementProtocol {
* a allServicesMetaData map.
* @throws YarnException
* @throws IOException
* @throws NMNotYetReadyException
* This exception is thrown when NM starts from scratch but has not
* yet connected with RM.
*/
@Public
@Stable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ protected static RetryPolicy createRetryPolicy(Configuration conf,
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);

/*
* Still keeping this to cover case like newer client talking
* to an older version of server
*/
exceptionToPolicyMap.put(NMNotYetReadyException.class, retryPolicy);

return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,6 @@ protected void resyncWithRM() {
@Override
public void run() {
try {
LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true);
if (!rmWorkPreservingRestartEnabled) {
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,6 @@ nodeManagerVersionId, containerReports, getRunningApplications(),
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));

LOG.info(successfullRegistrationMsg);
LOG.info("Notifying ContainerManager to unblock new container-requests");
this.context.getContainerManager().setBlockNewContainerRequests(false);
}

private List<ApplicationId> createKeepAliveApplicationList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public interface ContainerManager extends ServiceStateChangeListener,

void updateQueuingLimit(ContainerQueuingLimit queuingLimit);

void setBlockNewContainerRequests(boolean blockNewContainerRequests);

ContainerScheduler getContainerScheduler();

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
Expand Down Expand Up @@ -204,7 +203,6 @@ private enum ReInitOp {
protected final AsyncDispatcher dispatcher;

private final DeletionService deletionService;
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
Expand Down Expand Up @@ -550,10 +548,6 @@ protected void serviceStart() throws Exception {
refreshServiceAcls(conf, new NMPolicyProvider());
}

LOG.info("Blocking new container-requests as container manager rpc" +
" server is still starting.");
this.setBlockNewContainerRequests(true);

String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
String hostOverride = null;
Expand Down Expand Up @@ -617,7 +611,6 @@ void refreshServiceAcls(Configuration configuration,

@Override
public void serviceStop() throws Exception {
setBlockNewContainerRequests(true);
this.writeLock.lock();
try {
serviceStopped = true;
Expand Down Expand Up @@ -852,11 +845,6 @@ protected void authorizeStartAndResourceIncreaseRequest(
@Override
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
Expand Down Expand Up @@ -1113,11 +1101,6 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting container resource increase as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
Expand Down Expand Up @@ -1559,17 +1542,6 @@ public void handle(ContainerManagerEvent event) {
}
}

@Override
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
this.blockNewContainerRequests.set(blockNewContainerRequests);
}

@Private
@VisibleForTesting
public boolean getBlockNewContainerRequestsStatus() {
return this.blockNewContainerRequests.get();
}

@Override
public void stateChanged(Service service) {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,6 @@ public void handle(LogHandlerEvent event) {
};
}

@Override
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}

@Override
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
Expand All @@ -87,7 +86,6 @@
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -188,34 +186,6 @@ protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm,
}
}

// This test tests new container requests are blocked when NM starts from
// scratch until it register with RM AND while NM is resyncing with RM
@SuppressWarnings("unchecked")
@Test(timeout=60000)
public void testBlockNewContainerRequestsOnStartAndResync()
throws IOException, InterruptedException, YarnException {
NodeManager nm = new TestNodeManager2();
int port = ServerSocketUtil.getPort(49154, 10);
YarnConfiguration conf = createNMConfig(port);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
nm.init(conf);
nm.start();

// Start the container in running state
ContainerId cId = TestNodeManagerShutdown.createContainerId();
TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
processStartFile, port);

nm.getNMDispatcher().getEventHandler()
.handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}

@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testNMshutdownWhenResyncThrowException() throws IOException,
Expand Down Expand Up @@ -493,135 +463,6 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
}
}

class TestNodeManager2 extends NodeManager {

Thread launchContainersThread = null;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterImpl2(context, dispatcher,
healthChecker, metrics);
}

@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void setBlockNewContainerRequests(
boolean blockNewContainerRequests) {
if (blockNewContainerRequests) {
// start test thread right after blockNewContainerRequests is set
// true
super.setBlockNewContainerRequests(blockNewContainerRequests);
launchContainersThread = new RejectedContainersLauncherThread();
launchContainersThread.start();
} else {
// join the test thread right before blockNewContainerRequests is
// reset
try {
// stop the test thread
((RejectedContainersLauncherThread) launchContainersThread)
.setStopThreadFlag(true);
launchContainersThread.join();
((RejectedContainersLauncherThread) launchContainersThread)
.setStopThreadFlag(false);
super.setBlockNewContainerRequests(blockNewContainerRequests);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
}

class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater {

public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}

@Override
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();

try {
// ensure that containers are empty before restart nodeStatusUpdater
if (!containers.isEmpty()) {
for (Container container: containers.values()) {
Assert.assertEquals(ContainerState.COMPLETE,
container.cloneAndGetContainerStatus().getState());
}
}
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except
// containers from previous RM
// Wait here so as to sync with the main test thread.
syncBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
} catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
}
}
}

class RejectedContainersLauncherThread extends Thread {

boolean isStopped = false;
public void setStopThreadFlag(boolean isStopped) {
this.isStopped = isStopped;
}

@Override
public void run() {
int numContainers = 0;
int numContainersRejected = 0;
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
try {
while (!isStopped && numContainers < 10) {
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
null);
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
System.out.println("no. of containers to be launched: "
+ numContainers);
numContainers++;
try {
getContainerManager().startContainers(allRequests);
} catch (YarnException e) {
numContainersRejected++;
Assert.assertTrue(e.getMessage().contains(
"Rejecting new containers as NodeManager has not" +
" yet connected with ResourceManager"));
Assert.assertEquals(NMNotYetReadyException.class.getName(), e
.getClass().getName());
} catch (IOException e) {
e.printStackTrace();
assertionFailedInThread.set(true);
}
}
// no. of containers to be launched should equal to no. of
// containers rejected
Assert.assertEquals(numContainers, numContainersRejected);
} catch (AssertionError ae) {
assertionFailedInThread.set(true);
}
}
}
}

class TestNodeManager3 extends NodeManager {

private int registrationCount = 0;
Expand Down Expand Up @@ -681,11 +522,6 @@ protected ContainerManagerImpl createContainerManager(Context context,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}

@Override
protected void authorizeGetAndStopContainerRequest(
Expand Down
Loading

0 comments on commit 424887e

Please sign in to comment.