From 11e309d10e00d62a9f70640f3615fdeeb6f3cec7 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Thu, 1 Jun 2023 20:21:15 -0400 Subject: [PATCH] add feature flag to disable actor definition version seeding (#6959) --- .../io/airbyte/bootloader/BootloaderTest.java | 6 ++-- .../persistence/ActorDefinitionMigrator.java | 35 +++++++++++++------ .../ActorDefinitionMigratorTest.java | 11 +++++- .../src/main/kotlin/FlagDefinitions.kt | 1 + 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java index feeee523caf..dd71a00783f 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -149,7 +149,7 @@ void testBootloaderAppBlankDb() throws Exception { val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); val jobsPersistence = new DefaultJobPersistence(jobDatabase); val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); - val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository); + val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient); val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence); final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class); when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION); @@ -217,7 +217,7 @@ void testBootloaderAppRunSecretMigration() throws Exception { val spiedSecretMigrator = spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, Optional.of(secretPersistence))); - val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository); + val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient); val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence); final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class); when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION); @@ -342,7 +342,7 @@ void testIsLegalUpgradePredicate() throws Exception { val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); val jobsPersistence = new DefaultJobPersistence(jobDatabase); val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider); - val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository); + val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient); val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence); final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class); when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java index 455728d7938..0744c475e4c 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java @@ -4,6 +4,8 @@ package io.airbyte.config.persistence; +import static io.airbyte.featureflag.ContextKt.ANONYMOUS; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; @@ -17,8 +19,11 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.helpers.ConnectorRegistryConverters; -import io.airbyte.db.instance.configs.jooq.generated.tables.ActorDefinition; +import io.airbyte.featureflag.FeatureFlagClient; +import io.airbyte.featureflag.SeedActorDefinitionVersions; +import io.airbyte.featureflag.Workspace; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.validation.json.JsonValidationException; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.io.IOException; @@ -43,10 +48,11 @@ public class ActorDefinitionMigrator { private final ConfigRepository configRepository; - public static final ActorDefinition ACTOR_DEFINITION = ActorDefinition.ACTOR_DEFINITION; + private final FeatureFlagClient featureFlagClient; - public ActorDefinitionMigrator(final ConfigRepository configRepository) { + public ActorDefinitionMigrator(final ConfigRepository configRepository, final FeatureFlagClient featureFlagClient) { this.configRepository = configRepository; + this.featureFlagClient = featureFlagClient; } /** @@ -60,7 +66,7 @@ public ActorDefinitionMigrator(final ConfigRepository configRepository) { public void migrate(final List latestSources, final List latestDestinations, final boolean updateAll) - throws IOException { + throws IOException, JsonValidationException { LOGGER.info("Updating connector definitions from the seed if necessary..."); final Set connectorRepositoriesInUse = configRepository.getConnectorRepositoriesInUse(); @@ -162,7 +168,7 @@ ConnectorCounter updateConnectorDefinitions(final AirbyteConfig configType, final Set connectorRepositoriesInUse, final Map connectorRepositoryToIdVersionMap, final boolean updateAll) - throws IOException { + throws IOException, JsonValidationException { int newCount = 0; int updatedCount = 0; @@ -210,22 +216,31 @@ ConnectorCounter updateConnectorDefinitions(final AirbyteConfig configType, private void writeOrUpdateConnectorRegistryDefinition(final AirbyteConfig configType, final T definition) - throws IOException { + throws IOException, JsonValidationException { if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) { final ConnectorRegistrySourceDefinition registryDef = (ConnectorRegistrySourceDefinition) definition; registryDef.withProtocolVersion(getProtocolVersion(registryDef.getSpec())); final StandardSourceDefinition stdSourceDef = ConnectorRegistryConverters.toStandardSourceDefinition(registryDef); - final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef); - configRepository.writeSourceDefinitionAndDefaultVersion(stdSourceDef, actorDefinitionVersion); + if (featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))) { + final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef); + configRepository.writeSourceDefinitionAndDefaultVersion(stdSourceDef, actorDefinitionVersion); + } else { + configRepository.writeStandardSourceDefinition(stdSourceDef); + } } else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) { final ConnectorRegistryDestinationDefinition registryDef = (ConnectorRegistryDestinationDefinition) definition; registryDef.withProtocolVersion(getProtocolVersion(registryDef.getSpec())); final StandardDestinationDefinition stdDestDef = ConnectorRegistryConverters.toStandardDestinationDefinition(registryDef); - final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef); - configRepository.writeDestinationDefinitionAndDefaultVersion(stdDestDef, actorDefinitionVersion); + + if (featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))) { + final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef); + configRepository.writeDestinationDefinitionAndDefaultVersion(stdDestDef, actorDefinitionVersion); + } else { + configRepository.writeStandardDestinationDefinition(stdDestDef); + } } else { throw new IllegalArgumentException(UNKNOWN_CONFIG_TYPE + configType); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionMigratorTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionMigratorTest.java index d201221828b..84ee61b0d5e 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionMigratorTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ActorDefinitionMigratorTest.java @@ -6,11 +6,14 @@ import static io.airbyte.config.ConfigSchema.STANDARD_DESTINATION_DEFINITION; import static io.airbyte.config.ConfigSchema.STANDARD_SOURCE_DEFINITION; +import static io.airbyte.featureflag.ContextKt.ANONYMOUS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ReleaseStage; @@ -18,6 +21,10 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSourceDefinition.SourceType; import io.airbyte.config.persistence.ActorDefinitionMigrator.ConnectorInfo; +import io.airbyte.featureflag.FeatureFlagClient; +import io.airbyte.featureflag.SeedActorDefinitionVersions; +import io.airbyte.featureflag.TestClient; +import io.airbyte.featureflag.Workspace; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; @@ -69,13 +76,15 @@ class ActorDefinitionMigratorTest extends BaseConfigDatabaseTest { private ActorDefinitionMigrator migrator; private ConfigRepository configRepository; + private final FeatureFlagClient featureFlagClient = mock(TestClient.class); @BeforeEach void setup() throws SQLException { truncateAllTables(); configRepository = new ConfigRepository(database, null, MockData.MAX_SECONDS_BETWEEN_MESSAGE_SUPPLIER); - migrator = new ActorDefinitionMigrator(configRepository); + migrator = new ActorDefinitionMigrator(configRepository, featureFlagClient); + when(featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))).thenReturn(true); } private void writeSource(final StandardSourceDefinition source) throws Exception { diff --git a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt index 51442a9848c..ae7dd5fc914 100644 --- a/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt +++ b/airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt @@ -37,6 +37,7 @@ object CheckWithCatalog : Temporary(key = "check-with-catalog", default object ConnectorVersionOverridesEnabled : Temporary(key = "connectors.versionOverridesEnabled", default = false) object UseActorDefinitionVersionTableDefaults : Temporary(key = "connectors.useActorDefinitionVersionTableDefaults", default = false) +object SeedActorDefinitionVersions : Temporary(key = "connectors.seedActorDefinitionVersions", default = true) object MinimumCreditQuantity : Temporary(key = "minimum-credit-quantity", default = 100)