diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 92b48e046ae0..95c9f04d6c8b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -40,6 +40,7 @@ import java.net.Socket; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.time.Duration; import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; @@ -96,7 +97,7 @@ public class KubePodProcess extends Process { private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class); private static final String INIT_CONTAINER_NAME = "init"; - private static final Long STATUS_CHECK_INTERVAL_MS = 30 * 1000L; + public static final Duration DEFAULT_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); private static final String DEFAULT_MEMORY_LIMIT = "25Mi"; private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements() .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_LIMIT); @@ -132,6 +133,7 @@ public class KubePodProcess extends Process { private Long lastStatusCheck = null; private final ServerSocket stdoutServerSocket; + private final Duration statusCheckInterval; private final int stdoutLocalPort; private final ServerSocket stderrServerSocket; private final Map internalToExternalPorts; @@ -270,6 +272,7 @@ public KubePodProcess(final boolean isOrchestrator, final String processRunnerHost, final ApiClient officialClient, final KubernetesClient fabricClient, + final Duration statusCheckInterval, final String podName, final String namespace, final String image, @@ -292,6 +295,7 @@ public KubePodProcess(final boolean isOrchestrator, final String... args) throws IOException, InterruptedException { this.fabricClient = fabricClient; + this.statusCheckInterval = statusCheckInterval; this.stdoutLocalPort = stdoutLocalPort; this.stderrLocalPort = stderrLocalPort; @@ -594,7 +598,7 @@ private int getReturnCode(final Pod pod) { } // Reuse the last status check result to prevent overloading the Kube Api server. - if (lastStatusCheck != null && System.currentTimeMillis() - lastStatusCheck < STATUS_CHECK_INTERVAL_MS) { + if (lastStatusCheck != null && System.currentTimeMillis() - lastStatusCheck < statusCheckInterval.toMillis()) { throw new IllegalThreadStateException("Kube pod process has not exited yet."); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 961796862dff..1b87e51bb25c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -13,6 +13,7 @@ import io.kubernetes.client.openapi.ApiClient; import java.net.InetAddress; import java.nio.file.Path; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; @@ -52,6 +53,7 @@ public class KubeProcessFactory implements ProcessFactory { private final String kubeHeartbeatUrl; private final String processRunnerHost; private final boolean isOrchestrator; + private final Duration statusCheckInterval; /** * Sets up a process factory with the default processRunnerHost. @@ -63,7 +65,7 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs, final String kubeHeartbeatUrl, final boolean isOrchestrator) { this(workerConfigs, namespace, officialClient, fabricClient, kubeHeartbeatUrl, - Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), isOrchestrator); + Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), isOrchestrator, KubePodProcess.DEFAULT_STATUS_CHECK_INTERVAL); } /** @@ -75,6 +77,8 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs, * @param processRunnerHost is the local host or ip of the machine running the process factory. * injectable for testing. * @param isOrchestrator determines if this should run as airbyte-admin + * @param statusCheckInterval specifies how often the Kubernetes API should be consulted when + * attempting to get the exit code after termination */ @VisibleForTesting public KubeProcessFactory(final WorkerConfigs workerConfigs, @@ -83,7 +87,8 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs, final KubernetesClient fabricClient, final String kubeHeartbeatUrl, final String processRunnerHost, - final boolean isOrchestrator) { + final boolean isOrchestrator, + final Duration statusCheckInterval) { this.workerConfigs = workerConfigs; this.namespace = namespace; this.officialClient = officialClient; @@ -91,6 +96,7 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs, this.kubeHeartbeatUrl = kubeHeartbeatUrl; this.processRunnerHost = processRunnerHost; this.isOrchestrator = isOrchestrator; + this.statusCheckInterval = statusCheckInterval; } @Override @@ -129,6 +135,7 @@ public Process create(final String jobId, processRunnerHost, officialClient, fabricClient, + statusCheckInterval, podName, namespace, imageName, diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index d99b2a4ec6c5..317b84426207 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.ResourceRequirements; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerException; import io.fabric8.kubernetes.client.DefaultKubernetesClient; @@ -22,6 +23,7 @@ import java.net.ServerSocket; import java.net.UnknownHostException; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -30,6 +32,7 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -51,12 +54,14 @@ public class KubePodProcessIntegrationTest { private static ApiClient officialClient; private static KubernetesClient fabricClient; private static KubeProcessFactory processFactory; + private static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = new WorkerConfigs(new EnvConfigs()).getResourceRequirements(); private WorkerHeartbeatServer server; @BeforeAll public static void init() throws Exception { - openPorts = new ArrayList<>(getOpenPorts(5)); + openPorts = new ArrayList<>(getOpenPorts(30)); // todo: should we offer port pairs to prevent deadlock? can create test here with fewer to get + // this heartbeatPort = openPorts.get(0); heartbeatUrl = getHost() + ":" + heartbeatPort; @@ -66,7 +71,8 @@ public static void init() throws Exception { KubePortManagerSingleton.init(new HashSet<>(openPorts.subList(1, openPorts.size() - 1))); processFactory = - new KubeProcessFactory(new WorkerConfigs(new EnvConfigs()), "default", officialClient, fabricClient, heartbeatUrl, getHost(), false); + new KubeProcessFactory(new WorkerConfigs(new EnvConfigs()), "default", officialClient, fabricClient, heartbeatUrl, getHost(), false, + Duration.ofSeconds(1)); } @BeforeEach @@ -80,6 +86,42 @@ public void teardown() throws Exception { server.stop(); } + /** + * In the past we've had some issues with transient / stuck pods. The idea here is to run a few at + * once, and check that they are all running in hopes of identifying regressions that introduce + * flakiness. + */ + @Test + public void testConcurrentRunning() throws Exception { + final var pool = Executors.newFixedThreadPool(10); + + final var totalJobs = 30; + final var successCount = new AtomicInteger(0); + final var failCount = new AtomicInteger(0); + + for (int i = 0; i < totalJobs; i++) { + pool.submit(() -> { + try { + final Process process = getProcess("echo hi; sleep 1; echo hi2"); + process.waitFor(); + + // the pod should be dead and in a good state + assertFalse(process.isAlive()); + assertEquals(0, process.exitValue()); + successCount.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + failCount.incrementAndGet(); + } + }); + } + + pool.shutdown(); + pool.awaitTermination(2, TimeUnit.MINUTES); + + assertEquals(totalJobs, successCount.get()); + } + @Test public void testSuccessfulSpawning() throws Exception { // start a finite process @@ -250,7 +292,7 @@ private Process getProcess(final String entrypoint) throws WorkerException { false, files, entrypoint, - new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), + DEFAULT_RESOURCE_REQUIREMENTS, Map.of(), Map.of()); }