Skip to content

Commit

Permalink
add heartbeating when setting up the TemporalAttemptExecution (airbyt…
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor authored Jan 28, 2022
1 parent 4262d26 commit 2e772e1
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ public TemporalCancellationHandler() {
@Override
public void checkAndHandleCancellation(final Runnable onCancellationCallback) {
try {
/*
/**
* Heartbeat is somewhat misleading here. What it does is check the current Temporal activity's
* context and throw an exception if the sync has been cancelled or timed out. The input to this
* heartbeat function is available as a field in thrown ActivityCompletionExceptions, which we
* aren't using for now.
*
* We should use this only as a check for the ActivityCompletionException. See
* {@link TemporalUtils#withBackgroundHeartbeat} for where we actually send heartbeats to ensure
* that we don't time out the activity.
*/
context.heartbeat(null);
} catch (final ActivityCompletionException e) {
onCancellationCallback.run();
LOGGER.warn("Job either timeout-ed or was cancelled.");
LOGGER.warn("Job either timed out or was cancelled.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.temporal.activity.Activity;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -41,8 +40,6 @@ public class TemporalAttemptExecution<INPUT, OUTPUT> implements Supplier<OUTPUT>

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalAttemptExecution.class);

private static final Duration HEARTBEAT_INTERVAL = Duration.ofSeconds(10);

private final JobRunConfig jobRunConfig;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
Expand Down Expand Up @@ -135,7 +132,7 @@ public OUTPUT get() {
cancellationChecker.run();

workerThread.start();
scheduledExecutor.scheduleAtFixedRate(cancellationChecker, 0, HEARTBEAT_INTERVAL.toSeconds(), TimeUnit.SECONDS);
scheduledExecutor.scheduleAtFixedRate(cancellationChecker, 0, TemporalUtils.SEND_HEARTBEAT_INTERVAL.toSeconds(), TimeUnit.SECONDS);

try {
// block and wait for the output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.Activity;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
Expand All @@ -20,9 +22,14 @@
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.workflow.Functions;
import java.io.Serializable;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,6 +38,9 @@ public class TemporalUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(TemporalUtils.class);

public static final Duration SEND_HEARTBEAT_INTERVAL = Duration.ofSeconds(10);
public static final Duration HEARTBEAT_TIMEOUT = Duration.ofSeconds(30);

public static WorkflowServiceStubs createTemporalService(final String temporalHost) {
final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
// todo move to env.
Expand Down Expand Up @@ -144,4 +154,28 @@ protected static Set<String> getNamespaces(final WorkflowServiceStubs temporalSe
.collect(toSet());
}

/**
* Runs the code within the supplier while heartbeating in the backgroud. Also makes sure to shut
* down the heartbeat server after the fact.
*/
public static <T> T withBackgroundHeartbeat(Callable<T> callable) {
final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

try {
scheduledExecutor.scheduleAtFixedRate(() -> {
Activity.getExecutionContext().heartbeat(null);
}, 0, SEND_HEARTBEAT_INTERVAL.toSeconds(), TimeUnit.SECONDS);

return callable.call();
} catch (final ActivityCompletionException e) {
LOGGER.warn("Job either timed out or was cancelled.");
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
LOGGER.info("Stopping temporal heartbeating...");
scheduledExecutor.shutdown();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import io.airbyte.workers.temporal.TemporalUtils;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -73,31 +74,32 @@ public Void run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final ResourceRequirements resourceRequirements,
final OperatorDbtInput input) {
return TemporalUtils.withBackgroundHeartbeat(() -> {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);

final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
final Supplier<OperatorDbtInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.OPERATOR_DBT_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};

final Supplier<OperatorDbtInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.OPERATOR_DBT_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};
final CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> workerFactory;

final CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> workerFactory;
if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(destinationLauncherConfig, jobRunConfig, resourceRequirements);
}

if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(destinationLauncherConfig, jobRunConfig, resourceRequirements);
}
final TemporalAttemptExecution<OperatorDbtInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
workerFactory,
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

final TemporalAttemptExecution<OperatorDbtInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
workerFactory,
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

return temporalAttemptExecution.get();
return temporalAttemptExecution.get();
});
}

private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig destinationLauncherConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.temporal.TemporalUtils;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -65,71 +66,73 @@ public LauncherWorker(

@Override
public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException {
try {
final Map<String, String> envMap = System.getenv().entrySet().stream()
.filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, application,
OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig),
OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(input),
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)));

final Map<Integer, Integer> portMap = Map.of(
WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT,
OrchestratorConstants.PORT1, OrchestratorConstants.PORT1,
OrchestratorConstants.PORT2, OrchestratorConstants.PORT2,
OrchestratorConstants.PORT3, OrchestratorConstants.PORT3,
OrchestratorConstants.PORT4, OrchestratorConstants.PORT4);

final var allLabels = KubeProcessFactory.getLabels(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Collections.emptyMap());

final var podNameAndJobPrefix = podNamePrefix + "-j-" + jobRunConfig.getJobId() + "-a-";
killLowerAttemptIdsIfPresent(podNameAndJobPrefix, jobRunConfig.getAttemptId());

final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId();
final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName);

process = new AsyncOrchestratorPodProcess(
kubePodInfo,
containerOrchestratorConfig.documentStoreClient(),
containerOrchestratorConfig.kubernetesClient());

if (process.getDocStoreStatus().equals(AsyncKubePodStatus.NOT_STARTED)) {
process.create(
airbyteVersion,
allLabels,
resourceRequirements,
fileMap,
portMap);
}

// this waitFor can resume if the activity is re-run
process.waitFor();

if (process.exitValue() != 0) {
throw new WorkerException("Non-zero exit code!");
}

final var output = process.getOutput();

if (output.isPresent()) {
return Jsons.deserialize(output.get(), outputClass);
} else {
throw new WorkerException("Running the " + application + " launcher resulted in no readable output!");
}
} catch (Exception e) {
if (cancelled.get()) {
throw new WorkerException("Launcher " + application + " was cancelled.", e);
} else {
throw new WorkerException("Running the launcher " + application + " failed", e);
return TemporalUtils.withBackgroundHeartbeat(() -> {
try {
final Map<String, String> envMap = System.getenv().entrySet().stream()
.filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, application,
OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig),
OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(input),
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)));

final Map<Integer, Integer> portMap = Map.of(
WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT,
OrchestratorConstants.PORT1, OrchestratorConstants.PORT1,
OrchestratorConstants.PORT2, OrchestratorConstants.PORT2,
OrchestratorConstants.PORT3, OrchestratorConstants.PORT3,
OrchestratorConstants.PORT4, OrchestratorConstants.PORT4);

final var allLabels = KubeProcessFactory.getLabels(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Collections.emptyMap());

final var podNameAndJobPrefix = podNamePrefix + "-j-" + jobRunConfig.getJobId() + "-a-";
killLowerAttemptIdsIfPresent(podNameAndJobPrefix, jobRunConfig.getAttemptId());

final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId();
final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName);

process = new AsyncOrchestratorPodProcess(
kubePodInfo,
containerOrchestratorConfig.documentStoreClient(),
containerOrchestratorConfig.kubernetesClient());

if (process.getDocStoreStatus().equals(AsyncKubePodStatus.NOT_STARTED)) {
process.create(
airbyteVersion,
allLabels,
resourceRequirements,
fileMap,
portMap);
}

// this waitFor can resume if the activity is re-run
process.waitFor();

if (process.exitValue() != 0) {
throw new WorkerException("Non-zero exit code!");
}

final var output = process.getOutput();

if (output.isPresent()) {
return Jsons.deserialize(output.get(), outputClass);
} else {
throw new WorkerException("Running the " + application + " launcher resulted in no readable output!");
}
} catch (Exception e) {
if (cancelled.get()) {
throw new WorkerException("Launcher " + application + " was cancelled.", e);
} else {
throw new WorkerException("Running the launcher " + application + " failed", e);
}
}
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import io.airbyte.workers.temporal.TemporalUtils;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -70,31 +71,32 @@ public NormalizationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorC
public Void normalize(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final NormalizationInput input) {
return TemporalUtils.withBackgroundHeartbeat(() -> {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);

final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
final Supplier<NormalizationInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};

final Supplier<NormalizationInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};
final CheckedSupplier<Worker<NormalizationInput, Void>, Exception> workerFactory;

final CheckedSupplier<Worker<NormalizationInput, Void>, Exception> workerFactory;
if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
}

if (containerOrchestratorConfig.isPresent()) {
workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
} else {
workerFactory = getLegacyWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig);
}
final TemporalAttemptExecution<NormalizationInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
workerFactory,
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

final TemporalAttemptExecution<NormalizationInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
workerFactory,
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

return temporalAttemptExecution.get();
return temporalAttemptExecution.get();
});
}

private CheckedSupplier<Worker<NormalizationInput, Void>, Exception> getLegacyWorkerFactory(
Expand Down
Loading

0 comments on commit 2e772e1

Please sign in to comment.