Skip to content

Commit

Permalink
Add a config for instance wide oauth parameters (airbytehq#5761)
Browse files Browse the repository at this point in the history
* Add a config for instance wide oauth parameters

* Remove recreate endpoints and add tests

* Inject oauth params in discover/check/reset too

* Inject masked OAuth params for UI Validation (airbytehq#5865)

Co-authored-by: Sherif Nada <[email protected]>
  • Loading branch information
ChristopheDuong and sherifnada authored Sep 9, 2021
1 parent ea91e1e commit edcd83d
Show file tree
Hide file tree
Showing 26 changed files with 795 additions and 810 deletions.
54 changes: 8 additions & 46 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1364,17 +1364,17 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/sources/recreate:
/v1/web_backend/sources/create:
post:
tags:
- web_backend
summary: Recreate a source
operationId: webBackendRecreateSource
summary: Create a source
operationId: webBackendCreateSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceRecreate"
$ref: "#/components/schemas/SourceCreate"
required: true
responses:
"200":
Expand All @@ -1385,17 +1385,17 @@ paths:
$ref: "#/components/schemas/SourceRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/destinations/recreate:
/v1/web_backend/destinations/create:
post:
tags:
- web_backend
summary: Recreate a destination
operationId: webBackendRecreateDestination
summary: Create a destination
operationId: webBackendCreateDestination
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/DestinationRecreate"
$ref: "#/components/schemas/DestinationCreate"
required: true
responses:
"200":
Expand Down Expand Up @@ -1914,25 +1914,6 @@ components:
$ref: "#/components/schemas/WorkspaceId"
name:
type: string
SourceRecreate:
type: object
required:
- workspaceId
- sourceDefinitionId
- sourceId
- connectionConfiguration
- name
properties:
sourceDefinitionId:
$ref: "#/components/schemas/SourceDefinitionId"
sourceId:
$ref: "#/components/schemas/SourceId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
connectionConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
name:
type: string
SourceUpdate:
type: object
required:
Expand Down Expand Up @@ -2125,25 +2106,6 @@ components:
$ref: "#/components/schemas/DestinationDefinitionId"
connectionConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
DestinationRecreate:
type: object
required:
- workspaceId
- destinationId
- destinationDefinitionId
- connectionConfiguration
- name
properties:
destinationDefinitionId:
$ref: "#/components/schemas/DestinationDefinitionId"
destinationId:
$ref: "#/components/schemas/DestinationId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
connectionConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
name:
type: string
DestinationUpdate:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public enum ConfigSchema implements AirbyteConfig {
StandardSyncOperation.class,
standardSyncOperation -> standardSyncOperation.getOperationId().toString(),
"operationId"),

SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class,
sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),
DESTINATION_OAUTH_PARAM("DestinationOAuthParameter.yaml", DestinationOAuthParameter.class,
destinationOAuthParameter -> destinationOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

// worker
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/DestinationOAuthParameter.yaml
title: DestinationOAuthParameter
description: OAuth parameters used when connecting to destination
type: object
required:
- oauthParameterId
- destinationDefinitionId
- configuration
additionalProperties: false
properties:
oauthParameterId:
type: string
format: uuid
destinationDefinitionId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
configuration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/SourceOAuthParameter.yaml
title: SourceOAuthParameter
description: OAuth parameters used when connecting to source
type: object
required:
- oauthParameterId
- sourceDefinitionId
- configuration
additionalProperties: false
properties:
oauthParameterId:
type: string
format: uuid
sourceDefinitionId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
configuration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -211,6 +213,33 @@ public List<StandardSyncOperation> listStandardSyncOperations() throws IOExcepti
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class);
}

public SourceOAuthParameter getSourceOAuthParams(final UUID SourceOAuthParameterId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameterId.toString(), SourceOAuthParameter.class);
}

public void writeSourceOAuthParam(final SourceOAuthParameter SourceOAuthParameter) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.getOauthParameterId().toString(), SourceOAuthParameter);
}

public List<SourceOAuthParameter> listSourceOAuthParam() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class);
}

public DestinationOAuthParameter getDestinationOAuthParams(final UUID destinationOAuthParameterId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameterId.toString(), DestinationOAuthParameter.class);
}

public void writeDestinationOAuthParam(final DestinationOAuthParameter destinationOAuthParameter) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameter.getOauthParameterId().toString(),
destinationOAuthParameter);
}

public List<DestinationOAuthParameter> listDestinationOAuthParam() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class);
}

public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
persistence.replaceAllConfigs(configs, dryRun);
}
Expand Down
6 changes: 6 additions & 0 deletions airbyte-integrations/bases/base-normalization/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ function configuredbt() {
echo "Running: transform-config --config ${CONFIG_FILE} --integration-type ${INTEGRATION_TYPE} --out ${PROJECT_DIR}"
# Generate a profiles.yml file for the selected destination/integration type
transform-config --config "${CONFIG_FILE}" --integration-type "${INTEGRATION_TYPE}" --out "${PROJECT_DIR}"
# Remove config file as it might still contain sensitive credentials (for example,
# injected OAuth Parameters should not be visible to custom docker images running custom transformation operations)
rm "${CONFIG_FILE}"
if [[ -n "${CATALOG_FILE}" ]]; then
# If catalog file is provided, generate normalization models, otherwise skip it
echo "Running: transform-catalog --integration-type ${INTEGRATION_TYPE} --profile-config-dir ${PROJECT_DIR} --catalog ${CATALOG_FILE} --out ${PROJECT_DIR}/models/generated/ --json-column _airbyte_data"
Expand All @@ -55,6 +58,9 @@ function configuredbt() {
# Generate a profiles.yml file for the selected destination/integration type
echo "Running: transform-config --config ${CONFIG_FILE} --integration-type ${INTEGRATION_TYPE} --out ${PROJECT_DIR}"
transform-config --config "${CONFIG_FILE}" --integration-type "${INTEGRATION_TYPE}" --out "${PROJECT_DIR}"
# Remove config file as it might still contain sensitive credentials (for example,
# injected OAuth Parameters should not be visible to custom docker images running custom transformation operations)
rm "${CONFIG_FILE}"
fi
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -71,7 +72,10 @@ public JobScheduler(final JobPersistence jobPersistence,
jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(new DefaultJobCreator(jobPersistence), configRepository));
new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence),
configRepository,
new OAuthConfigSupplier(configRepository, false)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.scheduler.client;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobCheckConnectionConfig;
Expand All @@ -34,6 +35,7 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.TemporalClient;
Expand All @@ -48,16 +50,23 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl

private final TemporalClient temporalClient;
private final JobTracker jobTracker;
private final OAuthConfigSupplier oAuthConfigSupplier;

public DefaultSynchronousSchedulerClient(TemporalClient temporalClient, JobTracker jobTracker) {
public DefaultSynchronousSchedulerClient(TemporalClient temporalClient, JobTracker jobTracker, OAuthConfigSupplier oAuthConfigSupplier) {
this.temporalClient = temporalClient;
this.jobTracker = jobTracker;
this.oAuthConfigSupplier = oAuthConfigSupplier;
}

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJob(final SourceConnection source, final String dockerImage) {
public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJob(final SourceConnection source, final String dockerImage)
throws IOException {
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(source.getConfiguration())
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);

return execute(
Expand All @@ -69,9 +78,14 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(final DestinationConnection destination,
final String dockerImage) {
final String dockerImage)
throws IOException {
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationId(),
destination.getWorkspaceId(),
destination.getConfiguration());
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(destination.getConfiguration())
.withConnectionConfiguration(destinationConfiguration)
.withDockerImage(dockerImage);

return execute(
Expand All @@ -82,9 +96,13 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
}

@Override
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) {
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) throws IOException {
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
.withConnectionConfiguration(source.getConfiguration())
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);

return execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.temporal.JobMetadata;
Expand Down Expand Up @@ -87,13 +88,18 @@ class DefaultSynchronousSchedulerClientTest {

private TemporalClient temporalClient;
private JobTracker jobTracker;
private OAuthConfigSupplier oAuthConfigSupplier;
private DefaultSynchronousSchedulerClient schedulerClient;

@BeforeEach
void setup() {
void setup() throws IOException {
temporalClient = mock(TemporalClient.class);
jobTracker = mock(JobTracker.class);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker);
oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);

when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
}

private static JobMetadata createMetadata(boolean succeeded) {
Expand Down Expand Up @@ -172,7 +178,7 @@ void testExecuteRuntimeException() {
class TestJobCreation {

@Test
void testCreateSourceCheckConnectionJob() {
void testCreateSourceCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(SOURCE_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
Expand All @@ -186,7 +192,7 @@ void testCreateSourceCheckConnectionJob() {
}

@Test
void testCreateDestinationCheckConnectionJob() {
void testCreateDestinationCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
Expand All @@ -200,7 +206,7 @@ void testCreateDestinationCheckConnectionJob() {
}

@Test
void testCreateDiscoverSchemaJob() {
void testCreateDiscoverSchemaJob() throws IOException {
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
.withConnectionConfiguration(SOURCE_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public Optional<Long> createResetConnectionJob(DestinationConnection destination
configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});

final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(standardSync.getNamespaceDefinition())
.withNamespaceFormat(standardSync.getNamespaceFormat())
Expand Down
Loading

0 comments on commit edcd83d

Please sign in to comment.