Skip to content

Commit

Permalink
add mini k8s stress test (airbytehq#8978)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor authored Dec 21, 2021
1 parent d2d0205 commit ae5b1cf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class KubePodProcess extends Process {
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);

private static final String INIT_CONTAINER_NAME = "init";
private static final Long STATUS_CHECK_INTERVAL_MS = 30 * 1000L;
public static final Duration DEFAULT_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final String DEFAULT_MEMORY_LIMIT = "25Mi";
private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements()
.withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_LIMIT);
Expand Down Expand Up @@ -132,6 +133,7 @@ public class KubePodProcess extends Process {
private Long lastStatusCheck = null;

private final ServerSocket stdoutServerSocket;
private final Duration statusCheckInterval;
private final int stdoutLocalPort;
private final ServerSocket stderrServerSocket;
private final Map<Integer, Integer> internalToExternalPorts;
Expand Down Expand Up @@ -270,6 +272,7 @@ public KubePodProcess(final boolean isOrchestrator,
final String processRunnerHost,
final ApiClient officialClient,
final KubernetesClient fabricClient,
final Duration statusCheckInterval,
final String podName,
final String namespace,
final String image,
Expand All @@ -292,6 +295,7 @@ public KubePodProcess(final boolean isOrchestrator,
final String... args)
throws IOException, InterruptedException {
this.fabricClient = fabricClient;
this.statusCheckInterval = statusCheckInterval;
this.stdoutLocalPort = stdoutLocalPort;
this.stderrLocalPort = stderrLocalPort;

Expand Down Expand Up @@ -594,7 +598,7 @@ private int getReturnCode(final Pod pod) {
}

// Reuse the last status check result to prevent overloading the Kube Api server.
if (lastStatusCheck != null && System.currentTimeMillis() - lastStatusCheck < STATUS_CHECK_INTERVAL_MS) {
if (lastStatusCheck != null && System.currentTimeMillis() - lastStatusCheck < statusCheckInterval.toMillis()) {
throw new IllegalThreadStateException("Kube pod process has not exited yet.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kubernetes.client.openapi.ApiClient;
import java.net.InetAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class KubeProcessFactory implements ProcessFactory {
private final String kubeHeartbeatUrl;
private final String processRunnerHost;
private final boolean isOrchestrator;
private final Duration statusCheckInterval;

/**
* Sets up a process factory with the default processRunnerHost.
Expand All @@ -63,7 +65,7 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs,
final String kubeHeartbeatUrl,
final boolean isOrchestrator) {
this(workerConfigs, namespace, officialClient, fabricClient, kubeHeartbeatUrl,
Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), isOrchestrator);
Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), isOrchestrator, KubePodProcess.DEFAULT_STATUS_CHECK_INTERVAL);
}

/**
Expand All @@ -75,6 +77,8 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs,
* @param processRunnerHost is the local host or ip of the machine running the process factory.
* injectable for testing.
* @param isOrchestrator determines if this should run as airbyte-admin
* @param statusCheckInterval specifies how often the Kubernetes API should be consulted when
* attempting to get the exit code after termination
*/
@VisibleForTesting
public KubeProcessFactory(final WorkerConfigs workerConfigs,
Expand All @@ -83,14 +87,16 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs,
final KubernetesClient fabricClient,
final String kubeHeartbeatUrl,
final String processRunnerHost,
final boolean isOrchestrator) {
final boolean isOrchestrator,
final Duration statusCheckInterval) {
this.workerConfigs = workerConfigs;
this.namespace = namespace;
this.officialClient = officialClient;
this.fabricClient = fabricClient;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.processRunnerHost = processRunnerHost;
this.isOrchestrator = isOrchestrator;
this.statusCheckInterval = statusCheckInterval;
}

@Override
Expand Down Expand Up @@ -129,6 +135,7 @@ public Process create(final String jobId,
processRunnerHost,
officialClient,
fabricClient,
statusCheckInterval,
podName,
namespace,
imageName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
Expand All @@ -22,6 +23,7 @@
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -30,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -51,12 +54,14 @@ public class KubePodProcessIntegrationTest {
private static ApiClient officialClient;
private static KubernetesClient fabricClient;
private static KubeProcessFactory processFactory;
private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new WorkerConfigs(new EnvConfigs()).getResourceRequirements();

private WorkerHeartbeatServer server;

@BeforeAll
public static void init() throws Exception {
openPorts = new ArrayList<>(getOpenPorts(5));
openPorts = new ArrayList<>(getOpenPorts(30)); // todo: should we offer port pairs to prevent deadlock? can create test here with fewer to get
// this

heartbeatPort = openPorts.get(0);
heartbeatUrl = getHost() + ":" + heartbeatPort;
Expand All @@ -66,7 +71,8 @@ public static void init() throws Exception {

KubePortManagerSingleton.init(new HashSet<>(openPorts.subList(1, openPorts.size() - 1)));
processFactory =
new KubeProcessFactory(new WorkerConfigs(new EnvConfigs()), "default", officialClient, fabricClient, heartbeatUrl, getHost(), false);
new KubeProcessFactory(new WorkerConfigs(new EnvConfigs()), "default", officialClient, fabricClient, heartbeatUrl, getHost(), false,
Duration.ofSeconds(1));
}

@BeforeEach
Expand All @@ -80,6 +86,42 @@ public void teardown() throws Exception {
server.stop();
}

/**
* In the past we've had some issues with transient / stuck pods. The idea here is to run a few at
* once, and check that they are all running in hopes of identifying regressions that introduce
* flakiness.
*/
@Test
public void testConcurrentRunning() throws Exception {
final var pool = Executors.newFixedThreadPool(10);

final var totalJobs = 30;
final var successCount = new AtomicInteger(0);
final var failCount = new AtomicInteger(0);

for (int i = 0; i < totalJobs; i++) {
pool.submit(() -> {
try {
final Process process = getProcess("echo hi; sleep 1; echo hi2");
process.waitFor();

// the pod should be dead and in a good state
assertFalse(process.isAlive());
assertEquals(0, process.exitValue());
successCount.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
failCount.incrementAndGet();
}
});
}

pool.shutdown();
pool.awaitTermination(2, TimeUnit.MINUTES);

assertEquals(totalJobs, successCount.get());
}

@Test
public void testSuccessfulSpawning() throws Exception {
// start a finite process
Expand Down Expand Up @@ -250,7 +292,7 @@ private Process getProcess(final String entrypoint) throws WorkerException {
false,
files,
entrypoint,
new WorkerConfigs(new EnvConfigs()).getResourceRequirements(),
DEFAULT_RESOURCE_REQUIREMENTS,
Map.of(),
Map.of());
}
Expand Down

0 comments on commit ae5b1cf

Please sign in to comment.