Skip to content

Commit

Permalink
[hotfix] [yarn tests] Fix deadlock between YARN Session CLI tests and…
Browse files Browse the repository at this point in the history
… Surefire

The Surefire Plugin uses stdin to communicate with forked JVMs for tests.

The YARN Session CLI tests also try to read the stdin stream. The tests deadlock since
Surefire never releases the stdin locks during the lifetime of a test.

This change adds a parameter whether the YARN Session CLI should try to read
user console input, and sets this to false in the integration tests.
  • Loading branch information
StephanEwen committed May 30, 2016
1 parent 6511557 commit da23ee3
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,18 @@ public class FlinkYarnSessionCli {
*/
private final Option DYNAMIC_PROPERTIES;

private final boolean acceptInteractiveInput;

//------------------------------------ Internal fields -------------------------
private AbstractFlinkYarnCluster yarnCluster = null;
private boolean detachedMode = false;

/** Default yarn application name. */
private String defaultApplicationName = null;

public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
this.acceptInteractiveInput = acceptInteractiveInput;

QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
Expand Down Expand Up @@ -292,7 +296,7 @@ private static void writeYarnProperties(Properties properties, File propertiesFi
propertiesFile.setReadable(true, false); // readable for all.
}

public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster, boolean readConsoleInput) {
final String HELP = "Available commands:\n" +
"help - show these commands\n" +
"stop - stop the YARN session";
Expand All @@ -304,6 +308,8 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
// ------------------ check if there are updates by the cluster -----------

GetClusterStatusResponse status = yarnCluster.getClusterStatus();
LOG.debug("Received status message: {}", status);

if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
System.err.println("Number of connected TaskManagers changed to " +
status.numRegisteredTaskManagers() + ". " +
Expand All @@ -324,15 +330,16 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
yarnCluster.shutdown(true);
}

// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
long startTime = System.currentTimeMillis();
while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
&& !in.ready()) {
&& (!readConsoleInput || !in.ready()))
{
Thread.sleep(200);
}
//------------- handle interactive command by user. ----------------------

if (in.ready()) {
if (readConsoleInput && in.ready()) {
String command = in.readLine();
switch (command) {
case "quit":
Expand All @@ -347,6 +354,7 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
break;
}
}

if (yarnCluster.hasBeenStopped()) {
LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
break;
Expand All @@ -358,7 +366,7 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
}

public static void main(String[] args) {
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
System.exit(cli.run(args));
}

Expand Down Expand Up @@ -458,7 +466,7 @@ public int run(String[] args) {
"Please also note that the temporary files of the YARN session in {} will not be removed.",
flinkYarnClient.getSessionFilesDir());
} else {
runInteractiveCli(yarnCluster);
runInteractiveCli(yarnCluster, acceptInteractiveInput);

if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class CliFrontendParser {

/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn");
private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn", true);


static final Option HELP_OPTION = new Option("h", "help", false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testDynamicProperties() throws IOException {
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
Options options = new Options();
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
cli.getYARNSessionCLIOptions(options);

CommandLineParser parser = new PosixParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public int getReturnValue() {
public void run() {
switch(type) {
case YARN_SESSION:
yCli = new FlinkYarnSessionCli("", "");
yCli = new FlinkYarnSessionCli("", "", false);
returnValue = yCli.run(args);
break;
case CLI_FRONTEND:
Expand Down
3 changes: 2 additions & 1 deletion flink-yarn-tests/src/test/resources/log4j-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
log4j.logger.org.apache.hadoop=OFF
log4j.logger.org.apache.flink.runtime.leaderelection=INFO
log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO

0 comments on commit da23ee3

Please sign in to comment.