Skip to content

Commit

Permalink
cleanup catalog canonical hash feature flags (#8750)
Browse files Browse the repository at this point in the history
  • Loading branch information
flutra-osmani committed Sep 15, 2023
1 parent 9013601 commit 3c9c2ed
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import io.airbyte.featureflag.CanonicalCatalogSchema;
import io.airbyte.featureflag.CatalogCanonicalJson;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Source;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -380,35 +377,11 @@ public void deleteSource(final SourceRead source)
public DiscoverCatalogResult writeDiscoverCatalogResult(final SourceDiscoverSchemaWriteRequestBody request)
throws JsonValidationException, IOException {
final AirbyteCatalog persistenceCatalog = CatalogConverter.toProtocol(request.getCatalog());
final UUID catalogId;

if (shouldWriteCanonicalActorCatalog(request)) {
catalogId = writeCanonicalActorCatalog(persistenceCatalog, request);
} else {
catalogId = writeActorCatalog(persistenceCatalog, request);
}
final UUID catalogId = writeActorCatalog(persistenceCatalog, request);

return new DiscoverCatalogResult().catalogId(catalogId);
}

private boolean shouldWriteCanonicalActorCatalog(final SourceDiscoverSchemaWriteRequestBody request) {
return request.getSourceId() != null && featureFlagClient.boolVariation(CanonicalCatalogSchema.INSTANCE, new Source(request.getSourceId()));
}

private boolean shouldWriteCatalogInCanonicalJson(SourceDiscoverSchemaWriteRequestBody request) {
return request.getSourceId() != null && featureFlagClient.boolVariation(CatalogCanonicalJson.INSTANCE, new Source(request.getSourceId()));
}

private UUID writeCanonicalActorCatalog(final AirbyteCatalog persistenceCatalog, final SourceDiscoverSchemaWriteRequestBody request)
throws IOException {
return configRepository.writeCanonicalActorCatalogFetchEvent(
persistenceCatalog,
request.getSourceId(),
request.getConnectorVersion(),
request.getConfigurationHash(),
shouldWriteCatalogInCanonicalJson(request));
}

private UUID writeActorCatalog(final AirbyteCatalog persistenceCatalog, final SourceDiscoverSchemaWriteRequestBody request) throws IOException {
return configRepository.writeActorCatalogFetchEvent(
persistenceCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2433,54 +2433,18 @@ public ActorCatalog getActorCatalogById(final UUID actorCatalogId)
}

/**
* Store an Airbyte catalog in DB if it is not present already.
* <p>
* Checks in the config DB if the catalog is present already, if so returns it identifier. It is not
* present, it is inserted in DB with a new identifier and that identifier is returned.
* Store an Airbyte catalog in DB if it is not present already. Checks in the config DB if the
* catalog is present already, if so returns it identifier. If not present, it is inserted in DB
* with a new identifier and that identifier is returned.
*
* @param airbyteCatalog An Airbyte catalog to cache
* @param airbyteCatalog the catalog to be cached
* @param context - db context
* @param timestamp - timestamp
* @return the db identifier for the cached catalog.
*/
private UUID getOrInsertActorCatalog(final AirbyteCatalog airbyteCatalog,
final DSLContext context,
final OffsetDateTime timestamp) {
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes(
Charsets.UTF_8)).toString();
final Map<UUID, AirbyteCatalog> catalogs = findCatalogByHash(catalogHash, context);

for (final Map.Entry<UUID, AirbyteCatalog> entry : catalogs.entrySet()) {
if (entry.getValue().equals(airbyteCatalog)) {
return entry.getKey();
}
}

final UUID catalogId = UUID.randomUUID();
context.insertInto(ACTOR_CATALOG)
.set(ACTOR_CATALOG.ID, catalogId)
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(airbyteCatalog)))
.set(ACTOR_CATALOG.CATALOG_HASH, catalogHash)
.set(ACTOR_CATALOG.CREATED_AT, timestamp)
.set(ACTOR_CATALOG.MODIFIED_AT, timestamp).execute();
return catalogId;
}

/**
* This function will be used to gradually migrate the existing data in the database to use the
* canonical json serialization. It will first try to find the catalog using the canonical json
* serialization. If it fails, it will fallback to the old json serialization.
*
* @param airbyteCatalog the catalog to be cached
* @param context - db context
* @param timestamp - timestamp
* @param writeCatalogInCanonicalJson - should we write the catalog in canonical json
* @return the db identifier for the cached catalog.
*/
private UUID getOrInsertCanonicalActorCatalog(final AirbyteCatalog airbyteCatalog,
final DSLContext context,
final OffsetDateTime timestamp,
final boolean writeCatalogInCanonicalJson) {

final String canonicalCatalogHash = generateCanonicalHash(airbyteCatalog);
UUID catalogId = lookupCatalogId(canonicalCatalogHash, airbyteCatalog, context);
Expand All @@ -2494,9 +2458,7 @@ private UUID getOrInsertCanonicalActorCatalog(final AirbyteCatalog airbyteCatalo
return catalogId;
}

final String catalogHash = writeCatalogInCanonicalJson ? canonicalCatalogHash : oldCatalogHash;

return insertCatalog(airbyteCatalog, catalogHash, context, timestamp);
return insertCatalog(airbyteCatalog, canonicalCatalogHash, context, timestamp);
}

private String generateCanonicalHash(final AirbyteCatalog airbyteCatalog) {
Expand Down Expand Up @@ -2689,41 +2651,6 @@ public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog,
});
}

/**
* This function will be used to gradually transition to reading and writing canonical schemas.
* Eventually, the writeActorCatalogFetchEvent function will be removed and this function will be
* renamed to writeActorCatalogFetchEvent.
*
* @param catalog - catalog that was fetched.
* @param actorId - actor the catalog was fetched by
* @param connectorVersion - version of the connector when catalog was fetched
* @param writeCatalogInCanonicalJson - should we write the catalog in canonical json
* @param configurationHash - hash of the config of the connector when catalog was fetched
* @return The identifier (UUID) of the fetch event inserted in the database
* @throws IOException - error while interacting with db
*/
public UUID writeCanonicalActorCatalogFetchEvent(final AirbyteCatalog catalog,
final UUID actorId,
final String connectorVersion,
final String configurationHash,
final boolean writeCatalogInCanonicalJson)
throws IOException {
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID fetchEventID = UUID.randomUUID();
return database.transaction(ctx -> {
final UUID catalogId = getOrInsertCanonicalActorCatalog(catalog, ctx, timestamp, writeCatalogInCanonicalJson);
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorId)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, catalogId)
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, configurationHash)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, connectorVersion)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp).execute();
return catalogId;
});
}

/**
* Count connections in workspace.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ void testReadActorCatalog() throws IOException, JsonValidationException, SQLExce
final AirbyteCatalog firstCatalog = CatalogHelpers.createAirbyteCatalog("product",
Field.of("label", JsonSchemaType.STRING), Field.of("size", JsonSchemaType.NUMBER),
Field.of("color", JsonSchemaType.STRING), Field.of("price", JsonSchemaType.NUMBER));
configRepository.writeCanonicalActorCatalogFetchEvent(firstCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH, false);
configRepository.writeActorCatalogFetchEvent(firstCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH);

final AirbyteCatalog secondCatalog = CatalogHelpers.createAirbyteCatalog("product",
Field.of("size", JsonSchemaType.NUMBER), Field.of("label", JsonSchemaType.STRING),
Field.of("color", JsonSchemaType.STRING), Field.of("price", JsonSchemaType.NUMBER));
configRepository.writeCanonicalActorCatalogFetchEvent(secondCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, otherConfigHash, false);
configRepository.writeActorCatalogFetchEvent(secondCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, otherConfigHash);

final String expectedCatalog =
"{"
Expand Down Expand Up @@ -204,7 +204,7 @@ void testReadActorCatalog() throws IOException, JsonValidationException, SQLExce
}

@Test
void testWriteCanonicalActorCatalog() throws IOException, JsonValidationException, SQLException {
void testWriteCanonicalHashActorCatalog() throws IOException, JsonValidationException, SQLException {
final String canonicalConfigHash = "8ad32981";
final StandardWorkspace workspace = MockData.standardWorkspaces().get(0);

Expand All @@ -228,7 +228,7 @@ void testWriteCanonicalActorCatalog() throws IOException, JsonValidationExceptio
final AirbyteCatalog firstCatalog = CatalogHelpers.createAirbyteCatalog("product",
Field.of("label", JsonSchemaType.STRING), Field.of("size", JsonSchemaType.NUMBER),
Field.of("color", JsonSchemaType.STRING), Field.of("price", JsonSchemaType.NUMBER));
configRepository.writeCanonicalActorCatalogFetchEvent(firstCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH, true);
configRepository.writeActorCatalogFetchEvent(firstCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH);

String expectedCatalog =
"{"
Expand Down

0 comments on commit 3c9c2ed

Please sign in to comment.