Skip to content

Commit

Permalink
Cache schema during discoverSchema (airbytehq#10820)
Browse files Browse the repository at this point in the history
* Make SchedulerHandler store schema after fetching it

* Add `disable_cache` parameter to discover_schema API

* Return cached catalog if it already exists

* Address code review comments

* Add tests for caching of catalog in SchedulerHandler

* Format fixes

* Fix Acceptance tests

* New code review fixes

- Use upper case for global variable
- Inline definition and assignment of variable
  • Loading branch information
malikdiarra authored Mar 17, 2022
1 parent a050688 commit 3d9f9ec
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 15 deletions.
11 changes: 10 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceIdRequestBody"
$ref: "#/components/schemas/SourceDiscoverSchemaRequestBody"
required: true
responses:
"200":
Expand Down Expand Up @@ -2298,6 +2298,15 @@ components:
$ref: "#/components/schemas/WorkspaceId"
name:
type: string
SourceDiscoverSchemaRequestBody:
type: object
required:
- sourceId
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
disable_cache:
type: boolean
SourceUpdate:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
import io.airbyte.api.model.SourceDefinitionUpdate;
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceOauthConsentRequest;
import io.airbyte.api.model.SourceRead;
Expand Down Expand Up @@ -432,8 +433,8 @@ public CheckConnectionRead checkConnectionToSourceForUpdate(final SourceUpdate s
}

@Override
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceIdRequestBody sourceIdRequestBody) {
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(sourceIdRequestBody));
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) {
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaRequestBody));
}

// DB MIGRATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.api.model.AdvancedAuth;
import io.airbyte.api.model.AuthSpecification;
import io.airbyte.api.model.CheckConnectionRead;
Expand All @@ -19,17 +22,23 @@
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationSyncMode;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobIdRequestBody;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.LogRead;
import io.airbyte.api.model.SourceCoreConfig;
import io.airbyte.api.model.SourceDefinitionIdRequestBody;
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.api.model.SynchronousJobRead;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -68,6 +77,7 @@
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -77,6 +87,7 @@
public class SchedulerHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private static final HashFunction HASH_FUNCTION = Hashing.md5();

private final ConfigRepository configRepository;
private final SecretsRepositoryWriter secretsRepositoryWriter;
Expand Down Expand Up @@ -241,13 +252,35 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(
return checkDestinationConnectionFromDestinationCreate(destinationCoreConfig);
}

public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final SourceConnection source = configRepository.getSourceConnection(sourceIdRequestBody.getSourceId());
final SourceConnection source = configRepository.getSourceConnection(discoverSchemaRequestBody.getSourceId());
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(source.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
return discoverJobToOutput(response);

final String configHash = HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes(
Charsets.UTF_8)).toString();
final String connectorVersion = sourceDef.getDockerImageTag();
final Optional<ActorCatalog> currentCatalog =
configRepository.getSourceCatalog(discoverSchemaRequestBody.getSourceId(), configHash, connectorVersion);
final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache();
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), configHash, connectorVersion);
return discoverJobToOutput(response);
}
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class);
final SynchronousJobRead emptyJob = new SynchronousJobRead()
.configId("NoConfiguration")
.configType(JobConfigType.DISCOVER_SCHEMA)
.id(UUID.randomUUID())
.createdAt(0L)
.endedAt(0L)
.logs(new LogRead().logLines(new ArrayList<>()))
.succeeded(true);
return new SourceDiscoverSchemaRead()
.catalog(CatalogConverter.toApi(airbyteCatalog))
.jobInfo(emptyJob);
}

public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final SourceCoreConfig sourceCreate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.api.model.OperationReadList;
import io.airbyte.api.model.OperationUpdate;
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.WebBackendConnectionCreate;
Expand Down Expand Up @@ -181,8 +182,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId());

if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
final SourceIdRequestBody sourceId = new SourceIdRequestBody().sourceId(connection.getSourceId());
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(sourceId);
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId());
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);

final AirbyteCatalog original = connection.getSyncCatalog();
final AirbyteCatalog discovered = discoverSchema.getCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -32,13 +33,15 @@
import io.airbyte.api.model.SourceDefinitionIdRequestBody;
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
Expand Down Expand Up @@ -381,12 +384,14 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
@Test
void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceIdRequestBody request = new SourceIdRequestBody().sourceId(source.getSourceId());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());

final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(true);
when(discoverResponse.getOutput()).thenReturn(CatalogHelpers.createAirbyteCatalog("shoes", Field.of("sku", JsonSchemaType.STRING)));
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
Field.of("sku", JsonSchemaType.STRING));
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(true);

Expand All @@ -396,6 +401,7 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withSourceDefinitionId(source.getSourceDefinitionId()));
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.empty());
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
.thenReturn(discoverResponse);

Expand All @@ -405,13 +411,93 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio
assertNotNull(actual.getJobInfo());
assertTrue(actual.getJobInfo().getSucceeded());
verify(configRepository).getSourceConnection(source.getSourceId());
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), eq(SOURCE_DOCKER_TAG));
verify(configRepository).writeActorCatalogFetchEvent(eq(airbyteCatalog), eq(source.getSourceId()), any(), eq(SOURCE_DOCKER_TAG));
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
}

@Test
void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());

final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(true);
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
Field.of("sku", JsonSchemaType.STRING));
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(true);

when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withSourceDefinitionId(source.getSourceDefinitionId()));
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
final ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(airbyteCatalog))
.withCatalogHash("")
.withId(UUID.randomUUID());
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog));
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
.thenReturn(discoverResponse);

final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);

assertNotNull(actual.getCatalog());
assertNotNull(actual.getJobInfo());
assertTrue(actual.getJobInfo().getSucceeded());
verify(configRepository).getSourceConnection(source.getSourceId());
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), any());
verify(configRepository, never()).writeActorCatalogFetchEvent(any(), any(), any(), any());
verify(synchronousSchedulerClient, never()).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
}

@Test
void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).disableCache(true);

final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(true);
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
Field.of("sku", JsonSchemaType.STRING));
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(true);

when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withSourceDefinitionId(source.getSourceDefinitionId()));
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
final ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(airbyteCatalog))
.withCatalogHash("")
.withId(UUID.randomUUID());
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog));
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
.thenReturn(discoverResponse);

final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);

assertNotNull(actual.getCatalog());
assertNotNull(actual.getJobInfo());
assertTrue(actual.getJobInfo().getSucceeded());
verify(configRepository).getSourceConnection(source.getSourceId());
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), any());
verify(configRepository).writeActorCatalogFetchEvent(any(), any(), any(), any());
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
}

@Test
void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceIdRequestBody request = new SourceIdRequestBody().sourceId(source.getSourceId());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());

when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(new StandardSourceDefinition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.api.model.OperationUpdate;
import io.airbyte.api.model.ResourceRequirements;
import io.airbyte.api.model.SourceDiscoverSchemaRead;
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.SyncMode;
Expand Down Expand Up @@ -196,7 +197,9 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE

final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateBasicApiCatalog();

when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceIdRequestBody)).thenReturn(
final SourceDiscoverSchemaRequestBody sourceDiscoverSchema = new SourceDiscoverSchemaRequestBody();
sourceDiscoverSchema.setSourceId(connectionRead.getSourceId());
when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceDiscoverSchema)).thenReturn(
new SourceDiscoverSchemaRead()
.jobInfo(mock(SynchronousJobRead.class))
.catalog(modifiedCatalog));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.airbyte.api.client.model.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.SourceDefinitionRead;
import io.airbyte.api.client.model.SourceDefinitionSpecificationRead;
import io.airbyte.api.client.model.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.SourceIdRequestBody;
import io.airbyte.api.client.model.SourceRead;
import io.airbyte.api.client.model.SyncMode;
Expand Down Expand Up @@ -1145,7 +1146,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
}

private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException {
return apiClient.getSourceApi().discoverSchemaForSource(new SourceIdRequestBody().sourceId(sourceId)).getCatalog();
return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog();
}

private void assertSourceAndDestinationDbInSync(final boolean withScdTable) throws Exception {
Expand Down
Loading

0 comments on commit 3d9f9ec

Please sign in to comment.