Skip to content

Commit

Permalink
allow scheduling getspec workers (airbytehq#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada authored Oct 9, 2020
1 parent 1bf0de6 commit c81dad6
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaOutput.yaml",
"title": "StandardDiscoverSchemaOutput",
"description": "information required for connection.",
"type": "object",
"required": ["connectionConfiguration"],
"additionalProperties": false,
"properties":
{
"connectionConfiguration":
{
"description": "Integration specific blob. Must be a valid JSON string.",
"type": "object",
"existingJavaType": "com.fasterxml.jackson.databind.JsonNode",
},
},
}
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaOutput.yaml
title: StandardDiscoverSchemaOutput
description: information required for connection.
type: object
required:
- connectionConfiguration
additionalProperties: false
properties:
connectionConfiguration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@

import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobDiscoverSchemaConfig;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardDiscoverSchemaInput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.GetSpecWorker;
import io.airbyte.workers.Worker;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.protocols.singer.DefaultSingerTap;
Expand All @@ -40,6 +43,7 @@
import io.airbyte.workers.protocols.singer.SingerSyncWorker;
import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker;
import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker;
import io.airbyte.workers.wrappers.JobOutputGetSpecWorker;
import io.airbyte.workers.wrappers.JobOutputSyncWorker;
import java.nio.file.Path;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,22 +82,23 @@ public WorkerRun create(final Job job) {
LOGGER.info("job root: {}", jobRoot);

switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE:
case CHECK_CONNECTION_DESTINATION:
case CHECK_CONNECTION_SOURCE, CHECK_CONNECTION_DESTINATION -> {
final StandardCheckConnectionInput checkConnectionInput = getCheckConnectionInput(job.getConfig().getCheckConnection());
return creator.create(
jobRoot,
checkConnectionInput,
new JobOutputCheckConnectionWorker(
new SingerCheckConnectionWorker(new SingerDiscoverSchemaWorker(job.getConfig().getCheckConnection().getDockerImage(), pbf))));
case DISCOVER_SCHEMA:
}
case DISCOVER_SCHEMA -> {
final StandardDiscoverSchemaInput discoverSchemaInput = getDiscoverSchemaInput(job.getConfig().getDiscoverSchema());
return creator.create(
jobRoot,
discoverSchemaInput,
new JobOutputDiscoverSchemaWorker(
new SingerDiscoverSchemaWorker(job.getConfig().getDiscoverSchema().getDockerImage(), pbf)));
case SYNC:
}
case SYNC -> {
final StandardSyncInput syncInput = getSyncInput(job.getConfig().getSync());
final SingerDiscoverSchemaWorker discoverSchemaWorker = new SingerDiscoverSchemaWorker(job.getConfig().getSync().getSourceDockerImage(), pbf);
return creator.create(
Expand All @@ -107,8 +112,16 @@ public WorkerRun create(final Job job) {
new SingerSyncWorker(
new DefaultSingerTap(job.getConfig().getSync().getSourceDockerImage(), pbf, discoverSchemaWorker),
new DefaultSingerTarget(job.getConfig().getSync().getDestinationDockerImage(), pbf))));
default:
throw new RuntimeException("Unexpected config type: " + job.getConfig().getConfigType());
}
case GET_SPEC -> {
final JobGetSpecConfig getSpecInput = job.getConfig().getGetSpec();
final GetSpecWorker worker = new DefaultGetSpecWorker(pbf);
return creator.create(
jobRoot,
getSpecInput,
new JobOutputGetSpecWorker(worker));
}
default -> throw new RuntimeException("Unexpected config type: " + job.getConfig().getConfigType());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardDiscoverSchemaInput;
Expand All @@ -41,6 +42,7 @@
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker;
import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker;
import io.airbyte.workers.wrappers.JobOutputGetSpecWorker;
import io.airbyte.workers.wrappers.JobOutputSyncWorker;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -121,4 +123,18 @@ void testSync() {
Assertions.assertTrue(argument.getValue() instanceof JobOutputSyncWorker);
}

@SuppressWarnings("unchecked")
@Test
void testGetSpec() {
when(job.getConfig().getConfigType()).thenReturn(JobConfig.ConfigType.GET_SPEC);
JobGetSpecConfig expectedConfig = new JobGetSpecConfig().withDockerImage("notarealimage");
when(job.getConfig().getGetSpec()).thenReturn(expectedConfig);

factory.create(job);

ArgumentCaptor<Worker<JobGetSpecConfig, JobOutput>> argument = ArgumentCaptor.forClass(Worker.class);
verify(creator).create(eq(rootPath.resolve("1").resolve("2")), eq(expectedConfig), argument.capture());
Assertions.assertTrue(argument.getValue() instanceof JobOutputGetSpecWorker);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.workers.wrappers;

import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardGetSpecOutput;
import io.airbyte.workers.GetSpecWorker;

public class JobOutputGetSpecWorker extends OutputConvertingWorker<JobGetSpecConfig, StandardGetSpecOutput, JobOutput> {

public JobOutputGetSpecWorker(GetSpecWorker innerWorker) {
super(
innerWorker,
standardGetSpecOutput -> new JobOutput().withOutputType(JobOutput.OutputType.GET_SPEC).withGetSpec(standardGetSpecOutput));
}

}

0 comments on commit c81dad6

Please sign in to comment.