Skip to content

Commit

Permalink
prevent worker initialization from getting stuck (airbytehq#10108)
Browse files Browse the repository at this point in the history
* retry temporal service client creation

* ensure that exceptions actually cause the whole WorkerApp to fail and restart instead of sticking

* add timeout and speed up tests
  • Loading branch information
jrhizor authored Feb 4, 2022
1 parent 5da1848 commit 14d4c67
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 11 deletions.
11 changes: 10 additions & 1 deletion airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(Conf
}
}

public static void main(final String[] args) throws IOException, InterruptedException {
private static void launchWorkerApp() throws IOException {
final Configs configs = new EnvConfigs();

LogClientSingleton.getInstance().setWorkspaceMdc(configs.getWorkerEnvironment(), configs.getLogConfigs(),
Expand Down Expand Up @@ -409,4 +409,13 @@ public static void main(final String[] args) throws IOException, InterruptedExce
jobTracker).start();
}

public static void main(final String[] args) {
try {
launchWorkerApp();
} catch (Throwable t) {
LOGGER.error("Worker app failed", t);
System.exit(1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
Expand All @@ -49,13 +50,14 @@ public class TemporalUtils {

public static WorkflowServiceStubs createTemporalService(final String temporalHost) {
final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
// todo move to env.
.setTarget(temporalHost)
.setTarget(temporalHost) // todo: move to EnvConfigs
.build();

final WorkflowServiceStubs temporalService = WorkflowServiceStubs.newInstance(options);
waitForTemporalServerAndLog(temporalService);
return temporalService;
return getTemporalClientWhenConnected(
Duration.ofSeconds(2),
Duration.ofMinutes(2),
Duration.ofSeconds(5),
() -> WorkflowServiceStubs.newInstance(options));
}

public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build();
Expand Down Expand Up @@ -157,16 +159,34 @@ public static <STUB, A1, R> ImmutablePair<WorkflowExecution, CompletableFuture<R
return ImmutablePair.of(workflowExecution, resultAsync);
}

public static void waitForTemporalServerAndLog(final WorkflowServiceStubs temporalService) {
/**
* Loops and waits for the Temporal service to become available and returns a client.
*
* This function uses a supplier as input since the creation of a WorkflowServiceStubs can result in
* connection exceptions as well.
*/
public static WorkflowServiceStubs getTemporalClientWhenConnected(
final Duration waitInterval,
final Duration maxTimeToConnect,
final Duration waitAfterConnection,
final Supplier<WorkflowServiceStubs> temporalServiceSupplier) {
LOGGER.info("Waiting for temporal server...");

boolean temporalStatus = false;
WorkflowServiceStubs temporalService = null;
long millisWaited = 0;

while (!temporalStatus) {
if (millisWaited >= maxTimeToConnect.toMillis()) {
throw new RuntimeException("Could not create Temporal client within max timeout!");
}

LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
Exceptions.toRuntime(() -> Thread.sleep(2000));
Exceptions.toRuntime(() -> Thread.sleep(waitInterval.toMillis()));
millisWaited = millisWaited + waitInterval.toMillis();

try {
temporalService = temporalServiceSupplier.get();
temporalStatus = getNamespaces(temporalService).contains("default");
} catch (final Exception e) {
// Ignore the exception because this likely means that the Temporal service is still initializing.
Expand All @@ -175,9 +195,11 @@ public static void waitForTemporalServerAndLog(final WorkflowServiceStubs tempor
}

// sometimes it takes a few additional seconds for workflow queue listening to be available
Exceptions.toRuntime(() -> Thread.sleep(5000));
Exceptions.toRuntime(() -> Thread.sleep(waitAfterConnection.toMillis()));

LOGGER.info("Found temporal default namespace!");

return temporalService;
}

protected static Set<String> getNamespaces(final WorkflowServiceStubs temporalService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

package io.airbyte.workers.temporal;

import static io.airbyte.workers.temporal.TemporalUtils.waitForTemporalServerAndLog;
import static io.airbyte.workers.temporal.TemporalUtils.getTemporalClientWhenConnected;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -33,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -90,13 +92,37 @@ public void testWaitForTemporalServerAndLogThrowsException() {
final WorkflowServiceStubs workflowServiceStubs = mock(WorkflowServiceStubs.class, Mockito.RETURNS_DEEP_STUBS);
final DescribeNamespaceResponse describeNamespaceResponse = mock(DescribeNamespaceResponse.class);
final NamespaceInfo namespaceInfo = mock(NamespaceInfo.class);
final Supplier<WorkflowServiceStubs> serviceSupplier = mock(Supplier.class);

when(namespaceInfo.getName()).thenReturn("default");
when(describeNamespaceResponse.getNamespaceInfo()).thenReturn(namespaceInfo);
when(serviceSupplier.get())
.thenThrow(RuntimeException.class)
.thenReturn(workflowServiceStubs);
when(workflowServiceStubs.blockingStub().listNamespaces(any()).getNamespacesList())
.thenThrow(RuntimeException.class)
.thenReturn(List.of(describeNamespaceResponse));
getTemporalClientWhenConnected(Duration.ofMillis(10), Duration.ofSeconds(1), Duration.ofSeconds(0), serviceSupplier);
}

@Test
public void testWaitThatTimesOut() {
final WorkflowServiceStubs workflowServiceStubs = mock(WorkflowServiceStubs.class, Mockito.RETURNS_DEEP_STUBS);
final DescribeNamespaceResponse describeNamespaceResponse = mock(DescribeNamespaceResponse.class);
final NamespaceInfo namespaceInfo = mock(NamespaceInfo.class);
final Supplier<WorkflowServiceStubs> serviceSupplier = mock(Supplier.class);

when(namespaceInfo.getName()).thenReturn("default");
when(describeNamespaceResponse.getNamespaceInfo()).thenReturn(namespaceInfo);
when(serviceSupplier.get())
.thenThrow(RuntimeException.class)
.thenReturn(workflowServiceStubs);
when(workflowServiceStubs.blockingStub().listNamespaces(any()).getNamespacesList())
.thenThrow(RuntimeException.class)
.thenReturn(List.of(describeNamespaceResponse));
waitForTemporalServerAndLog(workflowServiceStubs);
assertThrows(RuntimeException.class, () -> {
getTemporalClientWhenConnected(Duration.ofMillis(100), Duration.ofMillis(10), Duration.ofSeconds(0), serviceSupplier);
});
}

@WorkflowInterface
Expand Down

0 comments on commit 14d4c67

Please sign in to comment.