Skip to content

Commit

Permalink
TEZ-3077. TezClient.waitTillReady should support timeout. Contributed…
Browse files Browse the repository at this point in the history
… by Kuhu Shukla.
  • Loading branch information
sidseth committed Apr 18, 2016
1 parent b78a84a commit 53aa661
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES

ALL CHANGES:
TEZ-3077. TezClient.waitTillReady should support timeout.
TEZ-3202. Reduce the memory need for jobs with high number of segments
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs
TEZ-3214. Tez UI 2: Pagination in All DAGs
Expand Down
91 changes: 81 additions & 10 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.NumberFormat;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -54,13 +55,15 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.util.Time;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotReady;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
Expand Down Expand Up @@ -808,25 +811,61 @@ public synchronized TezAppMasterStatus getAppMasterStatus() throws TezException,
*/
@Unstable
public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezException, IOException {
preWarm(preWarmVertex, 0, TimeUnit.MILLISECONDS);
}

/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated
* containers may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster. Based on the specified timeout value it returns
* false if the status is not READY after the wait period.
* @param preWarmVertex
* @param timeout
* @param unit
* @throws TezException
* @throws IOException
*/
@Unstable
public synchronized void preWarm(PreWarmVertex preWarmVertex,
long timeout, TimeUnit unit)
throws TezException, IOException {
if (!isSession) {
// do nothing for non session mode. This is there to let the code
// do nothing for non session mode. This is there to let the code
// work correctly in both modes
LOG.warn("preWarm is not supported in non-session mode, please use session-mode of TezClient");
LOG.warn("preWarm is not supported in non-session mode," +
"please use session-mode of TezClient");
return;
}

verifySessionStateForSubmission();

DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
+ preWarmDAGCounter++);
dag.addVertex(preWarmVertex);

boolean isReady;
try {
waitTillReady();
isReady = waitTillReady(timeout, unit);
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for AM to become available", e);
throw new IOException("Interrupted while waiting for AM to become " +
"available", e);
}
if(isReady) {
submitDAG(dag);
} else {
throw new SessionNotReady("Tez AM not ready, could not submit DAG");
}
submitDAG(dag);
}


Expand All @@ -841,22 +880,54 @@ public synchronized void preWarm(PreWarmVertex preWarmVertex) throws TezExceptio
*/
@Evolving
public synchronized void waitTillReady() throws IOException, TezException, InterruptedException {
waitTillReady(0, TimeUnit.MILLISECONDS);
}

/**
* Wait till the DAG is ready to be submitted.
* In non-session mode this is a no-op since the application can be
* immediately submitted.
* In session mode, this waits for the session host to be ready to accept
* a DAG and returns false if not ready after a configured time wait period.
* @param timeout
* @param unit
* @return true if READY or is not in session mode, false otherwise.
* @throws IOException
* @throws TezException
* @throws InterruptedException
*/
@Evolving
public synchronized boolean waitTillReady(long timeout, TimeUnit unit)
throws IOException, TezException, InterruptedException {
timeout = unit.toMillis(timeout);
if (!isSession) {
// nothing to wait for in non-session mode
return;
return true;
}

verifySessionStateForSubmission();
long startTime = Time.monotonicNow();
long timeLimit = startTime + timeout;
while (true) {
TezAppMasterStatus status = getAppMasterStatus();
if (status.equals(TezAppMasterStatus.SHUTDOWN)) {
throw new SessionNotRunning("TezSession has already shutdown. "
+ ((diagnostics != null) ? diagnostics : NO_CLUSTER_DIAGNOSTICS_MSG));
}
if (status.equals(TezAppMasterStatus.READY)) {
return;
return true;
}
if (timeout == 0) {
Thread.sleep(SLEEP_FOR_READY);
continue;
}
long now = Time.monotonicNow();
if (timeLimit > now) {
long sleepTime = Math.min(SLEEP_FOR_READY, timeLimit - now);
Thread.sleep(sleepTime);
} else {
return false;
}
Thread.sleep(SLEEP_FOR_READY);
}
}

Expand Down
31 changes: 31 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/SessionNotReady.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.api;

/**
* Exception thrown when the Tez Session is not ready
*/
public class SessionNotReady extends TezException {

private static final long serialVersionUID = -287996170505550317L;

public SessionNotReady(String message) {
super(message);
}
}
98 changes: 96 additions & 2 deletions tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;
Expand All @@ -37,9 +39,11 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -49,6 +53,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
Expand All @@ -66,6 +71,7 @@
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.SessionNotReady;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConfigurationConstants;
Expand Down Expand Up @@ -362,7 +368,95 @@ public void testPreWarm() throws Exception {

client.stop();
}


@Test (timeout=30000)
public void testPreWarmWithTimeout() throws Exception {
long startTime = 0 , endTime = 0;
TezClientForTest client = configureAndCreateTezClient();
final TezClientForTest spyClient = spy(client);
doCallRealMethod().when(spyClient).start();
doCallRealMethod().when(spyClient).stop();
spyClient.start();

when(
spyClient.mockYarnClient.getApplicationReport(
spyClient.mockAppId).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
when(
spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
(GetAMStatusRequestProto) any()))
.thenReturn(
GetAMStatusResponseProto.newBuilder().setStatus(
TezAppMasterStatusProto.INITIALIZING).build());
PreWarmVertex vertex =
PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1));
int timeout = 5000;
try {
startTime = Time.monotonicNow();
spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
fail("PreWarm should have encountered an Exception!");
} catch (SessionNotReady te) {
endTime = Time.monotonicNow();
assertTrue("Time taken is not as expected",
(endTime - startTime) > timeout);
verify(spyClient, times(0)).submitDAG(any(DAG.class));
Assert.assertTrue("Unexpected Exception message",
te.getMessage().contains("Tez AM not ready"));

}

when(
spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
(GetAMStatusRequestProto) any()))
.thenReturn(
GetAMStatusResponseProto.newBuilder().setStatus(
TezAppMasterStatusProto.READY).build());
try {
startTime = Time.monotonicNow();
spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
endTime = Time.monotonicNow();
assertTrue("Time taken is not as expected",
(endTime - startTime) <= timeout);
verify(spyClient, times(1)).submitDAG(any(DAG.class));
} catch (TezException te) {
fail("PreWarm should have succeeded!");
}
Thread amStateThread = new Thread() {
@Override
public void run() {
CountDownLatch latch = new CountDownLatch(1);
try {
when(
spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
(GetAMStatusRequestProto) any()))
.thenReturn(
GetAMStatusResponseProto.newBuilder().setStatus(
TezAppMasterStatusProto.INITIALIZING).build());
latch.await(1000, TimeUnit.MILLISECONDS);
when(
spyClient.sessionAmProxy.getAMStatus((RpcController) any(),
(GetAMStatusRequestProto) any()))
.thenReturn(
GetAMStatusResponseProto.newBuilder().setStatus(
TezAppMasterStatusProto.READY).build());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ServiceException e) {
e.printStackTrace();
}
}
};
amStateThread.start();
startTime = Time.monotonicNow();
spyClient.preWarm(vertex, timeout, TimeUnit.MILLISECONDS);
endTime = Time.monotonicNow();
assertTrue("Time taken is not as expected",
(endTime - startTime) <= timeout);
verify(spyClient, times(2)).submitDAG(any(DAG.class));
spyClient.stop();
client.stop();
}

@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
testMultipleSubmissionsJob(false);
Expand Down

0 comments on commit 53aa661

Please sign in to comment.