Skip to content

Commit

Permalink
[FLINK-22662][yarn][test] Stabilize YARNHighAvailabilityITCase
Browse files Browse the repository at this point in the history
This closes apache#16395
  • Loading branch information
xintongsong committed Jul 9, 2021
1 parent 0727d27 commit 928b689
Showing 1 changed file with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -74,23 +75,28 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assume.assumeTrue;
Expand Down Expand Up @@ -372,10 +378,34 @@ private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient)
return restClusterClient.submitJob(job).get();
}

private void killApplicationMaster(final String processName)
throws IOException, InterruptedException {
private void killApplicationMaster(final String processName) throws Exception {
final Set<Integer> origPids = getApplicationMasterPids(processName);
assertThat(origPids, not(empty()));

final Process exec = Runtime.getRuntime().exec("pkill -f " + processName);
assertThat(exec.waitFor(), is(0));

CommonTestUtils.waitUntilCondition(
() -> {
final Set<Integer> curPids = getApplicationMasterPids(processName);
return origPids.stream().noneMatch(curPids::contains);
},
Deadline.fromNow(TIMEOUT));
}

private Set<Integer> getApplicationMasterPids(final String processName)
throws IOException, InterruptedException {
final Process exec = Runtime.getRuntime().exec("pgrep -f " + processName);

if (exec.waitFor() != 0) {
return Collections.emptySet();
}

return Arrays.stream(
IOUtils.toString(exec.getInputStream(), StandardCharsets.UTF_8)
.split("\\s+"))
.map(Integer::valueOf)
.collect(Collectors.toSet());
}

private static void waitUntilJobIsRunning(
Expand Down

0 comments on commit 928b689

Please sign in to comment.