Skip to content

Commit

Permalink
local kubernetes support (airbytehq#1254)
Browse files Browse the repository at this point in the history
* initial attempt at generating local kube setup from docker compose

* update current state

* mounts not working

* working mounts, failing cors

* working UI

* add remaining todos

* update todos

* A

* use kustomize to select image versions

* kube process builder factory

* fix misalignment

* don't allow any retries for requested jobs

* fix log waiting and path handling

* update todos

* local volume handling

* propagate return code correctly

* update todos

* update docs

* fmt

* add to docs

* fix conflicting config file bug

* fmt

* delete unused file

* remove comment

* add job id and attempt as inputs

* rename to WorkerEnvironment

* fix example custom overlay

* less trigger-happy docs

* rename mounts

* show local csv as not working in kube in the docs

* use config maps for everything

* fix paths

* fix build

* fix stripe integration test usage

* fix papercups on kube
  • Loading branch information
jrhizor authored Dec 16, 2020
1 parent b362401 commit 1f90cf9
Show file tree
Hide file tree
Showing 40 changed files with 821 additions and 74 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ VERSION=0.8.0-alpha
DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_DB=airbyte
CONFIG_ROOT=/data
CONFIG_ROOT=/config
WORKSPACE_ROOT=/tmp/workspace
DATA_DOCKER_MOUNT=airbyte_data
DB_DOCKER_MOUNT=airbyte_db
WORKSPACE_DOCKER_MOUNT=airbyte_workspace
API_URL=http://localhost:8001/api/v1/
# todo (cgardens) - when we are mount raw directories instead of named volumes, *_DOCKER_MOUNT must
# be the same as *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/578
Expand Down
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_DB=airbyte
DEV_ROOT=/tmp/dev_root
CONFIG_ROOT=/tmp/data
CONFIG_ROOT=/tmp/config
# todo (cgardens) - when we are mount raw directories instead of named volumes, *_DOCKER_MOUNT must
# be the same as *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/578
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public interface Configs {

TrackingStrategy getTrackingStrategy();

WorkerEnvironment getWorkerEnvironment();

enum TrackingStrategy {
SEGMENT,
LOGGING
}

enum WorkerEnvironment {
DOCKER,
KUBERNETES
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class EnvConfigs implements Configs {

public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";
public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT";
public static final String LOCAL_ROOT = "LOCAL_ROOT";
Expand Down Expand Up @@ -148,6 +149,17 @@ public TrackingStrategy getTrackingStrategy() {
}
}

@Override
public WorkerEnvironment getWorkerEnvironment() {
final String workerEnvironment = getEnv.apply(WORKER_ENVIRONMENT);
if (workerEnvironment != null) {
return WorkerEnvironment.valueOf(workerEnvironment.toUpperCase());
}

LOGGER.info(WORKER_ENVIRONMENT + " not found, defaulting to " + WorkerEnvironment.DOCKER);
return WorkerEnvironment.DOCKER;
}

private String getEnv(final String name) {
return getEnv.apply(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@

public abstract class TestDestination {

private static final long JOB_ID = 0L;
private static final int JOB_ATTEMPT = 0;

private static final Logger LOGGER = LoggerFactory.getLogger(TestDestination.class);

private TestDestinationEnv testEnv;
Expand Down Expand Up @@ -358,12 +361,12 @@ public void testSecondSync() throws Exception {
}

private OutputAndStatus<StandardGetSpecOutput> runSpec() {
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(getImageName(), pbf))
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private OutputAndStatus<StandardCheckConnectionOutput> runCheck(JsonNode config) {
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(getImageName(), pbf))
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
}

Expand All @@ -374,7 +377,7 @@ private void runSync(JsonNode config, List<AirbyteMessage> messages, ConfiguredA
.withCatalog(catalog)
.withDestinationConnectionConfiguration(config);

final AirbyteDestination target = new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(getImageName(), pbf));
final AirbyteDestination target = new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf));

target.start(targetConfig, jobRoot);
messages.forEach(message -> Exceptions.toRuntime(() -> target.accept(message)));
Expand All @@ -391,7 +394,7 @@ private void runSync(JsonNode config, List<AirbyteMessage> messages, ConfiguredA
pbf, targetConfig.getDestinationConnectionConfiguration());
runner.start();
final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));
if (!runner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) {
if (!runner.normalize(JOB_ID, JOB_ATTEMPT, normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) {
throw new WorkerException("Normalization Failed.");
}
runner.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@

public abstract class StandardSourceTest {

private static final long JOB_ID = 0L;
private static final int JOB_ATTEMPT = 0;

private static final Logger LOGGER = LoggerFactory.getLogger(StandardSourceTest.class);

private TestDestinationEnv testEnv;
Expand Down Expand Up @@ -368,17 +371,17 @@ private boolean sourceSupportsIncremental() throws Exception {
}

private OutputAndStatus<StandardGetSpecOutput> runSpec() {
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(getImageName(), pbf))
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private OutputAndStatus<StandardCheckConnectionOutput> runCheck() throws Exception {
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(getImageName(), pbf))
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot);
}

private OutputAndStatus<StandardDiscoverCatalogOutput> runDiscover() throws Exception {
return new DefaultDiscoverCatalogWorker(new AirbyteIntegrationLauncher(getImageName(), pbf))
return new DefaultDiscoverCatalogWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot);
}

Expand All @@ -393,7 +396,7 @@ private List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog catalog, JsonNode
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(getImageName(), pbf));
final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf));
final List<AirbyteMessage> messages = new ArrayList<>();

source.start(tapConfig, jobRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@

public class SingerStripeSourceTest {

private static final long JOB_ID = 0L;
private static final int JOB_ATTEMPT = 0;

private static final Path TESTS_PATH = Path.of("/tmp/airbyte_integration_tests");
private static final String IMAGE_NAME = "airbyte/source-stripe-singer:dev";

Expand Down Expand Up @@ -97,6 +100,8 @@ public void setUp() throws IOException, StripeException {
writeConfigFilesToJobRoot();

launcher = new AirbyteIntegrationLauncher(
JOB_ID,
JOB_ATTEMPT,
IMAGE_NAME,
new DockerProcessBuilderFactory(workspaceRoot, workspaceRoot.toString(), "", "host"));
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-scheduler/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*
!Dockerfile
!build
!bin
9 changes: 9 additions & 0 deletions airbyte-scheduler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ WORKDIR /app
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/${WAIT_VERSION}/wait wait
RUN chmod +x wait

# Install kubectl
RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.20.0/bin/linux/amd64/kubectl
RUN chmod +x ./kubectl
RUN mv ./kubectl /usr/local/bin

# Move kube runner
COPY bin/kube_runner.sh /usr/local/bin/kube_runner.sh

# Move and run scheduler
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1
Expand Down
36 changes: 36 additions & 0 deletions airbyte-scheduler/bin/kube_runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bash

set -e

JOB_YAML_PATH=$1
echo "JOB_YAML_PATH = $JOB_YAML_PATH"

echo "Launch job..."
kubectl apply -f "$JOB_YAML_PATH"

JOB_NAME=$(grep airbyte-worker- < "$JOB_YAML_PATH" | cut -d " " -f4)
echo "JOB_NAME = $JOB_NAME"
JOB_UUID=$(kubectl get job "$JOB_NAME" -o "jsonpath={.metadata.labels.controller-uid}")
echo "JOB_UUID = $JOB_UUID"
POD_NAME=$(kubectl get po -l controller-uid="$JOB_UUID" -o name)
echo "POD_NAME = $POD_NAME"

echo "Waiting for pod to start and emitting logs..."
while ! (kubectl logs "$POD_NAME" --follow --pod-running-timeout=1000m)
do
echo "Retrying..."
done

PHASE=$(kubectl get "$POD_NAME" --output="jsonpath={.status.phase}")
echo "Phase of pod: $PHASE"

if [[ "$PHASE" == "Failed" ]]; then
echo "Failed: Exiting with code 1"
exit 1
elif [[ "$PHASE" == "Unknown" ]]; then
echo "Unknown: Exiting with code 1"
exit 1
else
echo "Success: Exiting code 0"
exit 0
fi
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.KubeProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import java.time.Duration;
Expand Down Expand Up @@ -101,6 +102,18 @@ public void start() {
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, scheduledPool));
}

private static ProcessBuilderFactory getProcessBuilderFactory(Configs configs) {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
return new KubeProcessBuilderFactory(configs.getWorkspaceRoot());
} else {
return new DockerProcessBuilderFactory(
configs.getWorkspaceRoot(),
configs.getWorkspaceDockerMount(),
configs.getLocalDockerMount(),
configs.getDockerNetwork());
}
}

public static void main(String[] args) {
final Configs configs = new EnvConfigs();

Expand All @@ -116,11 +129,7 @@ public static void main(String[] args) {
configs.getDatabasePassword(),
configs.getDatabaseUrl());

final ProcessBuilderFactory pbf = new DockerProcessBuilderFactory(
workspaceRoot,
configs.getWorkspaceDockerMount(),
configs.getLocalDockerMount(),
configs.getDockerNetwork());
final ProcessBuilderFactory pbf = getProcessBuilderFactory(configs);

final JobPersistence jobPersistence = new DefaultJobPersistence(database);
final ConfigPersistence configPersistence = new DefaultConfigPersistence(configRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,69 +89,81 @@ public WorkerRun create(final Job job) {
LOGGER.info("job root: {}", jobRoot);

return switch (job.getConfig().getConfigType()) {
case GET_SPEC -> createGetSpecWorker(job.getConfig().getGetSpec(), jobRoot);
case CHECK_CONNECTION_SOURCE, CHECK_CONNECTION_DESTINATION -> createConnectionCheckWorker(job.getConfig().getCheckConnection(), jobRoot);
case DISCOVER_SCHEMA -> createDiscoverCatalogWorker(job.getConfig().getDiscoverCatalog(), jobRoot);
case SYNC -> createSyncWorkerFromSyncConfig(job.getConfig().getSync(), jobRoot);
case RESET_CONNECTION -> createSyncWorkerFromResetConfig(job.getConfig().getResetConnection(), jobRoot);
case GET_SPEC -> createGetSpecWorker(job.getId(), currentAttempt, job.getConfig().getGetSpec(), jobRoot);
case CHECK_CONNECTION_SOURCE, CHECK_CONNECTION_DESTINATION -> createConnectionCheckWorker(job.getId(), currentAttempt,
job.getConfig().getCheckConnection(), jobRoot);
case DISCOVER_SCHEMA -> createDiscoverCatalogWorker(job.getId(), currentAttempt, job.getConfig().getDiscoverCatalog(), jobRoot);
case SYNC -> createSyncWorkerFromSyncConfig(job.getId(), currentAttempt, job.getConfig().getSync(), jobRoot);
case RESET_CONNECTION -> createSyncWorkerFromResetConfig(job.getId(), currentAttempt, job.getConfig().getResetConnection(), jobRoot);
};
}

private WorkerRun createGetSpecWorker(JobGetSpecConfig config, Path jobRoot) {
final IntegrationLauncher launcher = createLauncher(config.getDockerImage());
private WorkerRun createGetSpecWorker(long jobId, int attempt, JobGetSpecConfig config, Path jobRoot) {
final IntegrationLauncher launcher = createLauncher(jobId, attempt, config.getDockerImage());

return creator.create(
jobRoot,
config,
new JobOutputGetSpecWorker(new DefaultGetSpecWorker(launcher)));
}

private WorkerRun createConnectionCheckWorker(JobCheckConnectionConfig config, Path jobRoot) {
private WorkerRun createConnectionCheckWorker(long jobId, int attempt, JobCheckConnectionConfig config, Path jobRoot) {
final StandardCheckConnectionInput checkConnectionInput = getCheckConnectionInput(config);

final IntegrationLauncher launcher = createLauncher(config.getDockerImage());
final IntegrationLauncher launcher = createLauncher(jobId, attempt, config.getDockerImage());
return creator.create(
jobRoot,
checkConnectionInput,
new JobOutputCheckConnectionWorker(new DefaultCheckConnectionWorker(launcher)));
}

private WorkerRun createDiscoverCatalogWorker(JobDiscoverCatalogConfig config, Path jobRoot) {
private WorkerRun createDiscoverCatalogWorker(long jobId, int attempt, JobDiscoverCatalogConfig config, Path jobRoot) {
final StandardDiscoverCatalogInput discoverSchemaInput = getDiscoverCatalogInput(config);

final IntegrationLauncher launcher = createLauncher(config.getDockerImage());
final IntegrationLauncher launcher = createLauncher(jobId, attempt, config.getDockerImage());

return creator.create(
jobRoot,
discoverSchemaInput,
new JobOutputDiscoverSchemaWorker(new DefaultDiscoverCatalogWorker(launcher)));
}

private WorkerRun createSyncWorkerFromResetConfig(JobResetConnectionConfig config, Path jobRoot) {
private WorkerRun createSyncWorkerFromResetConfig(long jobId, int attempt, JobResetConnectionConfig config, Path jobRoot) {
return createSyncWorker(
jobId,
attempt,
new EmptyAirbyteSource(),
config.getDestinationDockerImage(),
getSyncInputFromResetConfig(config),
jobRoot);
}

private WorkerRun createSyncWorkerFromSyncConfig(JobSyncConfig config, Path jobRoot) {
final DefaultAirbyteSource airbyteSource = new DefaultAirbyteSource(createLauncher(config.getSourceDockerImage()));
private WorkerRun createSyncWorkerFromSyncConfig(long jobId, int attempt, JobSyncConfig config, Path jobRoot) {
final DefaultAirbyteSource airbyteSource = new DefaultAirbyteSource(createLauncher(jobId, attempt, config.getSourceDockerImage()));
return createSyncWorker(
jobId,
attempt,
airbyteSource,
config.getDestinationDockerImage(),
getSyncInputSyncConfig(config),
jobRoot);
}

private WorkerRun createSyncWorker(AirbyteSource airbyteSource, String destinationDockerImage, StandardSyncInput syncInput, Path jobRoot) {
final IntegrationLauncher destinationLauncher = createLauncher(destinationDockerImage);
private WorkerRun createSyncWorker(long jobId,
int attempt,
AirbyteSource airbyteSource,
String destinationDockerImage,
StandardSyncInput syncInput,
Path jobRoot) {
final IntegrationLauncher destinationLauncher = createLauncher(jobId, attempt, destinationDockerImage);

return creator.create(
jobRoot,
syncInput,
new JobOutputSyncWorker(
new DefaultSyncWorker(
jobId,
attempt,
airbyteSource,
new DefaultAirbyteDestination(destinationLauncher),
new AirbyteMessageTracker(),
Expand All @@ -161,8 +173,8 @@ private WorkerRun createSyncWorker(AirbyteSource airbyteSource, String destinati
syncInput.getDestinationConfiguration()))));
}

private IntegrationLauncher createLauncher(final String image) {
return new AirbyteIntegrationLauncher(image, pbf);
private IntegrationLauncher createLauncher(long jobId, int attempt, final String image) {
return new AirbyteIntegrationLauncher(jobId, attempt, image, pbf);
}

private static StandardCheckConnectionInput getCheckConnectionInput(JobCheckConnectionConfig config) {
Expand Down
5 changes: 4 additions & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ configurations {
}

dependencies {
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'io.kubernetes:client-java-api:10.0.0'
implementation 'io.kubernetes:client-java:10.0.0'
implementation 'io.kubernetes:client-java-extended:10.0.0'

implementation project(':airbyte-config:models')
implementation project(':airbyte-db')
Expand Down
Loading

0 comments on commit 1f90cf9

Please sign in to comment.