Skip to content

Commit

Permalink
Issue 23062 add update api for manifest version (#5231)
Browse files Browse the repository at this point in the history
Co-authored-by: Joe Reuter <[email protected]>
  • Loading branch information
maxi297 and Joe Reuter committed Mar 22, 2023
1 parent 02be1c2 commit 0f4f4a9
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 245 deletions.
33 changes: 31 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,23 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"409":
description: Version already exists for definition id
/v1/declarative_source_definitions/update_active_manifest:
post:
tags:
- declarative_source_definitions
summary: Update the declarative manifest version for a source
operationId: updateDeclarativeManifestVersion
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/UpdateActiveManifestRequestBody"
required: true
responses:
"204":
description: Successful operation
"404":
$ref: "#/components/responses/NotFoundResponse"
/v1/declarative_source_definitions/list_manifests:
post:
tags:
Expand Down Expand Up @@ -2816,8 +2833,7 @@ components:
spec:
$ref: "#/components/schemas/SourceDefinitionSpecification"
version:
type: integer
format: int64
$ref: "#/components/schemas/ManifestVersion"
SourceDefinitionIdBody:
type: object
required:
Expand Down Expand Up @@ -2991,6 +3007,19 @@ components:
type: boolean
declarativeManifest:
$ref: "#/components/schemas/DeclarativeSourceManifest"
UpdateActiveManifestRequestBody:
type: object
required:
- workspaceId
- sourceDefinitionId
- version
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
sourceDefinitionId:
$ref: "#/components/schemas/ConnectorBuilderProjectId"
version:
$ref: "#/components/schemas/ManifestVersion"
PrivateSourceDefinitionRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import io.airbyte.api.model.generated.ExistingConnectorBuilderProjectWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionIdBody;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.server.handlers.helpers.ConnectorBuilderSpecAdapter;
import io.airbyte.config.ActorDefinitionConfigInjection;
import io.airbyte.commons.server.handlers.helpers.DeclarativeSourceManifestInjector;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConnectorBuilderProject;
import io.airbyte.config.ConnectorBuilderProjectVersionedManifest;
Expand Down Expand Up @@ -48,18 +47,18 @@ public class ConnectorBuilderProjectsHandler {

private final ConfigRepository configRepository;
private final Supplier<UUID> uuidSupplier;
private final ConnectorBuilderSpecAdapter specAdapter;
private final DeclarativeSourceManifestInjector manifestInjector;
private final CdkVersionProvider cdkVersionProvider;

@Inject
public ConnectorBuilderProjectsHandler(final ConfigRepository configRepository,
final CdkVersionProvider cdkVersionProvider,
final Supplier<UUID> uuidSupplier,
final ConnectorBuilderSpecAdapter specAdapter) {
final DeclarativeSourceManifestInjector manifestInjector) {
this.configRepository = configRepository;
this.cdkVersionProvider = cdkVersionProvider;
this.uuidSupplier = uuidSupplier;
this.specAdapter = specAdapter;
this.manifestInjector = manifestInjector;
}

private static ConnectorBuilderProjectDetailsRead builderProjectToDetailsRead(final ConnectorBuilderProject project) {
Expand Down Expand Up @@ -182,6 +181,7 @@ public SourceDefinitionIdBody publishConnectorBuilderProject(final ConnectorBuil
throws IOException {
final JsonNode manifest = connectorBuilderPublishRequestBody.getInitialDeclarativeManifest().getManifest();
final JsonNode spec = connectorBuilderPublishRequestBody.getInitialDeclarativeManifest().getSpec();
manifestInjector.addInjectedDeclarativeManifest(spec);
final UUID actorDefinitionId = createActorDefinition(connectorBuilderPublishRequestBody.getName(),
connectorBuilderPublishRequestBody.getWorkspaceId(),
manifest,
Expand All @@ -200,7 +200,7 @@ public SourceDefinitionIdBody publishConnectorBuilderProject(final ConnectorBuil
}

private UUID createActorDefinition(final String name, final UUID workspaceId, final JsonNode manifest, final JsonNode spec) throws IOException {
final ConnectorSpecification connectorSpecification = specAdapter.adapt(spec);
final ConnectorSpecification connectorSpecification = manifestInjector.createDeclarativeManifestConnectorSpecification(spec);
final StandardSourceDefinition source = new StandardSourceDefinition()
.withSourceDefinitionId(uuidSupplier.get())
.withName(name)
Expand All @@ -216,11 +216,7 @@ private UUID createActorDefinition(final String name, final UUID workspaceId, fi
.withDocumentationUrl(connectorSpecification.getDocumentationUrl().toString());
configRepository.writeCustomSourceDefinition(source, workspaceId);

configRepository.writeActorDefinitionConfigInjectionForPath(
new ActorDefinitionConfigInjection()
.withActorDefinitionId(source.getSourceDefinitionId())
.withInjectionPath("__injected_declarative_manifest")
.withJsonToInject(manifest));
configRepository.writeActorDefinitionConfigInjectionForPath(manifestInjector.createConfigInjection(source.getSourceDefinitionId(), manifest));
return source.getSourceDefinitionId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@

package io.airbyte.commons.server.handlers;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.model.generated.DeclarativeManifestVersionRead;
import io.airbyte.api.model.generated.DeclarativeManifestsReadList;
import io.airbyte.api.model.generated.DeclarativeSourceDefinitionCreateManifestRequestBody;
import io.airbyte.api.model.generated.ListDeclarativeManifestsRequestBody;
import io.airbyte.api.model.generated.UpdateActiveManifestRequestBody;
import io.airbyte.commons.server.errors.DeclarativeSourceNotFoundException;
import io.airbyte.commons.server.errors.SourceIsNotDeclarativeException;
import io.airbyte.commons.server.errors.ValueConflictKnownException;
import io.airbyte.commons.server.handlers.helpers.ConnectorBuilderSpecAdapter;
import io.airbyte.commons.server.handlers.helpers.DeclarativeSourceManifestInjector;
import io.airbyte.config.DeclarativeManifest;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -34,55 +35,77 @@
public class DeclarativeSourceDefinitionsHandler {

private final ConfigRepository configRepository;
private final ConnectorBuilderSpecAdapter specAdapter;
private final DeclarativeSourceManifestInjector manifestInjector;

@Inject
public DeclarativeSourceDefinitionsHandler(final ConfigRepository configRepository,
final ConnectorBuilderSpecAdapter specAdapter) {
final DeclarativeSourceManifestInjector manifestInjector) {
this.configRepository = configRepository;
this.specAdapter = specAdapter;
this.manifestInjector = manifestInjector;
}

private void validateDeclarativeSourceDefinition(final UUID sourceDefinitionId, final UUID workspaceId) throws IOException {
if (!configRepository.workspaceCanUseCustomDefinition(sourceDefinitionId, workspaceId)) {
throw new DeclarativeSourceNotFoundException(
String.format("Can't find source definition id `%s` in workspace %s", sourceDefinitionId, workspaceId));
}
}
public void createDeclarativeSourceDefinitionManifest(DeclarativeSourceDefinitionCreateManifestRequestBody requestBody) throws IOException {
validateAccessToSource(requestBody.getSourceDefinitionId(), requestBody.getWorkspaceId());

public void createDeclarativeSourceDefinitionManifest(final DeclarativeSourceDefinitionCreateManifestRequestBody requestBody) throws IOException {
validateDeclarativeSourceDefinition(requestBody.getSourceDefinitionId(), requestBody.getWorkspaceId());
final Set<Long> existingVersions = configRepository.getDeclarativeManifestsByActorDefinitionId(
requestBody.getSourceDefinitionId()).map(DeclarativeManifest::getVersion).collect(Collectors.toSet());

final long version = requestBody.getDeclarativeManifest().getVersion().longValue();
Collection<Long> existingVersions = fetchAvailableManifestVersions(requestBody.getSourceDefinitionId());
final long version = requestBody.getDeclarativeManifest().getVersion();
if (existingVersions.isEmpty()) {
throw new SourceIsNotDeclarativeException(
String.format("Source %s is does not have a declarative manifest associated to it", requestBody.getSourceDefinitionId()));
} else if (existingVersions.contains(version)) {
throw new ValueConflictKnownException(String.format("Version '%s' for source %s already exists", version, requestBody.getSourceDefinitionId()));
}

final DeclarativeManifest declarativeManifest = new DeclarativeManifest()
JsonNode spec = requestBody.getDeclarativeManifest().getSpec();
manifestInjector.addInjectedDeclarativeManifest(spec);
DeclarativeManifest declarativeManifest = new DeclarativeManifest()
.withActorDefinitionId(requestBody.getSourceDefinitionId())
.withVersion(version)
.withDescription(requestBody.getDeclarativeManifest().getDescription())
.withManifest(requestBody.getDeclarativeManifest().getManifest())
.withSpec(requestBody.getDeclarativeManifest().getSpec());
.withSpec(spec);
if (requestBody.getSetAsActiveManifest()) {
final ConnectorSpecification connectorSpecification = specAdapter.adapt(requestBody.getDeclarativeManifest().getSpec());
configRepository.updateDeclarativeActorDefinition(requestBody.getSourceDefinitionId(), requestBody.getDeclarativeManifest().getManifest(),
connectorSpecification);
configRepository.insertActiveDeclarativeManifest(declarativeManifest);
configRepository.createDeclarativeManifestAsActiveVersion(declarativeManifest,
manifestInjector.createConfigInjection(requestBody.getSourceDefinitionId(), requestBody.getDeclarativeManifest().getManifest()),
manifestInjector.createDeclarativeManifestConnectorSpecification(spec));
} else {
configRepository.insertDeclarativeManifest(declarativeManifest);
}
}

public void updateDeclarativeManifestVersion(UpdateActiveManifestRequestBody requestBody) throws IOException, ConfigNotFoundException {
validateAccessToSource(requestBody.getSourceDefinitionId(), requestBody.getWorkspaceId());
Collection<Long> existingVersions = fetchAvailableManifestVersions(requestBody.getSourceDefinitionId());
if (existingVersions.isEmpty()) {
throw new SourceIsNotDeclarativeException(
String.format("Source %s is does not have a declarative manifest associated to it", requestBody.getSourceDefinitionId()));
}

final DeclarativeManifest declarativeManifest = configRepository.getDeclarativeManifestByActorDefinitionIdAndVersion(
requestBody.getSourceDefinitionId(), requestBody.getVersion());
configRepository.setDeclarativeSourceActiveVersion(requestBody.getSourceDefinitionId(),
declarativeManifest.getVersion(),
manifestInjector.createConfigInjection(declarativeManifest.getActorDefinitionId(), declarativeManifest.getManifest()),
manifestInjector.createDeclarativeManifestConnectorSpecification(declarativeManifest.getSpec()));
}

private Collection<Long> fetchAvailableManifestVersions(final UUID sourceDefinitionId) throws IOException {
return configRepository.getDeclarativeManifestsByActorDefinitionId(sourceDefinitionId)
.map(DeclarativeManifest::getVersion)
.collect(Collectors.toSet());
}

private void validateAccessToSource(UUID actorDefinitionId, UUID workspaceId) throws IOException {
if (!configRepository.workspaceCanUseCustomDefinition(actorDefinitionId, workspaceId)) {
throw new DeclarativeSourceNotFoundException(
String.format("Can't find source definition id `%s` in workspace %s", actorDefinitionId, workspaceId));
}
}

public DeclarativeManifestsReadList listManifestVersions(
final ListDeclarativeManifestsRequestBody requestBody)
throws IOException, ConfigNotFoundException {
validateDeclarativeSourceDefinition(requestBody.getSourceDefinitionId(), requestBody.getWorkspaceId());
validateAccessToSource(requestBody.getSourceDefinitionId(), requestBody.getWorkspaceId());
final Stream<DeclarativeManifest> existingVersions = configRepository.getDeclarativeManifestsByActorDefinitionId(
requestBody.getSourceDefinitionId());
final DeclarativeManifest activeVersion =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.handlers.helpers;

import static io.airbyte.commons.version.AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.config.ActorDefinitionConfigInjection;
import io.airbyte.protocol.models.ConnectorSpecification;
import jakarta.inject.Singleton;
import java.net.URI;
import java.util.UUID;

/**
* Adapter to go from Connector Builder concept to actor definitions.
*/
@Singleton
public class DeclarativeSourceManifestInjector {

private static final String INJECTED_DECLARATIVE_MANIFEST = "__injected_declarative_manifest";

/**
* Add __injected_declarative_manifest to the spec.
*
* @param spec Connector Builder spec
*/
public void addInjectedDeclarativeManifest(final JsonNode spec) {
((ObjectNode) spec.path("connectionSpecification").path("properties"))
.putObject(INJECTED_DECLARATIVE_MANIFEST)
.put("type", "object")
.put("additionalProperties", true);
}

/**
* Create ActorDefinitionConfigInjection with __injected_declarative_manifest.
*
* @param sourceDefinitionId matching the source definition the config needs to be injected
* @param manifest to be injected
*/
public ActorDefinitionConfigInjection createConfigInjection(final UUID sourceDefinitionId, final JsonNode manifest) {
return new ActorDefinitionConfigInjection()
.withActorDefinitionId(sourceDefinitionId)
.withInjectionPath(INJECTED_DECLARATIVE_MANIFEST)
.withJsonToInject(manifest);
}

/**
* Adapt the spec to match the actor definition.
*
* @param declarativeManifestSpec to be adapted to a ConnectorSpecification
*/
public ConnectorSpecification createDeclarativeManifestConnectorSpecification(JsonNode declarativeManifestSpec) {
return new ConnectorSpecification()
.withSupportsDBT(false)
.withSupportsNormalization(false)
// FIXME should be aligned with the airbyte-cdk version but will be addressed as part of
// https://github.com/airbytehq/airbyte/issues/24047
.withProtocolVersion(DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize())
.withDocumentationUrl(URI.create(declarativeManifestSpec.path("documentationUrl").asText("")))
.withConnectionSpecification(declarativeManifestSpec.get("connectionSpecification"));
}

}
Loading

0 comments on commit 0f4f4a9

Please sign in to comment.