Skip to content

Commit

Permalink
add feature flag to disable actor definition version seeding (#6959)
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroslopez committed Jun 2, 2023
1 parent bba5e7c commit 11e309d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void testBootloaderAppBlankDb() throws Exception {
val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider);
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository);
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient);
val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence);
final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class);
when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION);
Expand Down Expand Up @@ -217,7 +217,7 @@ void testBootloaderAppRunSecretMigration() throws Exception {

val spiedSecretMigrator =
spy(new SecretMigrator(secretsReader, secretsWriter, configRepository, jobsPersistence, Optional.of(secretPersistence)));
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository);
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient);
val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence);
final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class);
when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION);
Expand Down Expand Up @@ -342,7 +342,7 @@ void testIsLegalUpgradePredicate() throws Exception {
val jobsDatabaseMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
val protocolVersionChecker = new ProtocolVersionChecker(jobsPersistence, airbyteProtocolRange, configRepository, definitionsProvider);
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository);
val actorDefinitionMigrator = new ActorDefinitionMigrator(configRepository, featureFlagClient);
val applyDefinitionsHelper = new ApplyDefinitionsHelper(actorDefinitionMigrator, definitionsProvider, jobsPersistence);
final CdkVersionProvider cdkVersionProvider = mock(CdkVersionProvider.class);
when(cdkVersionProvider.getCdkVersion()).thenReturn(CDK_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.config.persistence;

import static io.airbyte.featureflag.ContextKt.ANONYMOUS;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
Expand All @@ -17,8 +19,11 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.helpers.ConnectorRegistryConverters;
import io.airbyte.db.instance.configs.jooq.generated.tables.ActorDefinition;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.SeedActorDefinitionVersions;
import io.airbyte.featureflag.Workspace;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import java.io.IOException;
Expand All @@ -43,10 +48,11 @@ public class ActorDefinitionMigrator {

private final ConfigRepository configRepository;

public static final ActorDefinition ACTOR_DEFINITION = ActorDefinition.ACTOR_DEFINITION;
private final FeatureFlagClient featureFlagClient;

public ActorDefinitionMigrator(final ConfigRepository configRepository) {
public ActorDefinitionMigrator(final ConfigRepository configRepository, final FeatureFlagClient featureFlagClient) {
this.configRepository = configRepository;
this.featureFlagClient = featureFlagClient;
}

/**
Expand All @@ -60,7 +66,7 @@ public ActorDefinitionMigrator(final ConfigRepository configRepository) {
public void migrate(final List<ConnectorRegistrySourceDefinition> latestSources,
final List<ConnectorRegistryDestinationDefinition> latestDestinations,
final boolean updateAll)
throws IOException {
throws IOException, JsonValidationException {
LOGGER.info("Updating connector definitions from the seed if necessary...");

final Set<String> connectorRepositoriesInUse = configRepository.getConnectorRepositoriesInUse();
Expand Down Expand Up @@ -162,7 +168,7 @@ <T> ConnectorCounter updateConnectorDefinitions(final AirbyteConfig configType,
final Set<String> connectorRepositoriesInUse,
final Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap,
final boolean updateAll)
throws IOException {
throws IOException, JsonValidationException {
int newCount = 0;
int updatedCount = 0;

Expand Down Expand Up @@ -210,22 +216,31 @@ <T> ConnectorCounter updateConnectorDefinitions(final AirbyteConfig configType,

private <T> void writeOrUpdateConnectorRegistryDefinition(final AirbyteConfig configType,
final T definition)
throws IOException {
throws IOException, JsonValidationException {
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
final ConnectorRegistrySourceDefinition registryDef = (ConnectorRegistrySourceDefinition) definition;
registryDef.withProtocolVersion(getProtocolVersion(registryDef.getSpec()));

final StandardSourceDefinition stdSourceDef = ConnectorRegistryConverters.toStandardSourceDefinition(registryDef);
final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef);
configRepository.writeSourceDefinitionAndDefaultVersion(stdSourceDef, actorDefinitionVersion);

if (featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))) {
final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef);
configRepository.writeSourceDefinitionAndDefaultVersion(stdSourceDef, actorDefinitionVersion);
} else {
configRepository.writeStandardSourceDefinition(stdSourceDef);
}
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
final ConnectorRegistryDestinationDefinition registryDef = (ConnectorRegistryDestinationDefinition) definition;
registryDef.withProtocolVersion(getProtocolVersion(registryDef.getSpec()));

final StandardDestinationDefinition stdDestDef = ConnectorRegistryConverters.toStandardDestinationDefinition(registryDef);
final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef);
configRepository.writeDestinationDefinitionAndDefaultVersion(stdDestDef, actorDefinitionVersion);

if (featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))) {
final ActorDefinitionVersion actorDefinitionVersion = ConnectorRegistryConverters.toActorDefinitionVersion(registryDef);
configRepository.writeDestinationDefinitionAndDefaultVersion(stdDestDef, actorDefinitionVersion);
} else {
configRepository.writeStandardDestinationDefinition(stdDestDef);
}
} else {
throw new IllegalArgumentException(UNKNOWN_CONFIG_TYPE + configType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import static io.airbyte.config.ConfigSchema.STANDARD_DESTINATION_DEFINITION;
import static io.airbyte.config.ConfigSchema.STANDARD_SOURCE_DEFINITION;
import static io.airbyte.featureflag.ContextKt.ANONYMOUS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ReleaseStage;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.persistence.ActorDefinitionMigrator.ConnectorInfo;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.SeedActorDefinitionVersions;
import io.airbyte.featureflag.TestClient;
import io.airbyte.featureflag.Workspace;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -69,13 +76,15 @@ class ActorDefinitionMigratorTest extends BaseConfigDatabaseTest {

private ActorDefinitionMigrator migrator;
private ConfigRepository configRepository;
private final FeatureFlagClient featureFlagClient = mock(TestClient.class);

@BeforeEach
void setup() throws SQLException {
truncateAllTables();

configRepository = new ConfigRepository(database, null, MockData.MAX_SECONDS_BETWEEN_MESSAGE_SUPPLIER);
migrator = new ActorDefinitionMigrator(configRepository);
migrator = new ActorDefinitionMigrator(configRepository, featureFlagClient);
when(featureFlagClient.boolVariation(SeedActorDefinitionVersions.INSTANCE, new Workspace(ANONYMOUS))).thenReturn(true);
}

private void writeSource(final StandardSourceDefinition source) throws Exception {
Expand Down
1 change: 1 addition & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object CheckWithCatalog : Temporary<Boolean>(key = "check-with-catalog", default
object ConnectorVersionOverridesEnabled : Temporary<Boolean>(key = "connectors.versionOverridesEnabled", default = false)

object UseActorDefinitionVersionTableDefaults : Temporary<Boolean>(key = "connectors.useActorDefinitionVersionTableDefaults", default = false)
object SeedActorDefinitionVersions : Temporary<Boolean>(key = "connectors.seedActorDefinitionVersions", default = true)

object MinimumCreditQuantity : Temporary<Int>(key = "minimum-credit-quantity", default = 100)

Expand Down

0 comments on commit 11e309d

Please sign in to comment.