Skip to content

Commit

Permalink
Bmoric/complete first sync (airbytehq#7832)
Browse files Browse the repository at this point in the history
After a sync is completed, we tag all the workspace with related to this sync as having a first sync completed.

This will then be forwarded to the UI in another PR.

This is part of airbytehq#5884
  • Loading branch information
benmoriceau authored Nov 18, 2021
1 parent 9a3a327 commit 0d51742
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 28 deletions.
4 changes: 4 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,10 @@ components:
type: array
items:
$ref: "#/components/schemas/Notification"
firstCompletedSync:
type: boolean
feedbackDone:
type: boolean
WorkspaceUpdate:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
}

@Override
public void deleteConfig(final AirbyteConfig configType, final String configId) {
throw new UnsupportedOperationException("The seed config persistence is read only.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ properties:
type: array
items:
"$ref": Notification.yaml
firstCompletedSync:
type: boolean
feedbackDone:
type: boolean
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ConfigPersistence {

<T> void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException;

<T> void writeConfigs(AirbyteConfig configType, Map<String, T> configs) throws IOException, JsonValidationException;

void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException;

void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ public StandardSourceDefinition getSourceDefinitionFromConnection(final UUID con
}
}

public StandardWorkspace getStandardWorkspaceFromConnection(final UUID connectionId, final boolean isTombstone) {
try {
final StandardSync sync = getStandardSync(connectionId);
final SourceConnection source = getSourceConnection(sync.getSourceId());
return getStandardWorkspace(source.getWorkspaceId(), isTombstone);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public List<StandardSourceDefinition> listStandardSourceDefinitions() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -146,18 +147,32 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws IOException {
database.transaction(ctx -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));
final Map<String, T> configIdToConfig = new HashMap<>() {

{
put(configId, config);
}

};
writeConfigs(configType, configIdToConfig);
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException {
database.transaction(ctx -> {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((configId, config) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)));

if (isExistingConfig) {
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName());
}
if (isExistingConfig) {
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config),
configType.getIdFieldName());
}
});

return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
}
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs) throws IOException {
synchronized (lock) {
for (final Map.Entry<String, T> config : configs.entrySet()) {
writeConfigInternal(configType, config.getKey(), config.getValue());
}
}
}

private <T> void writeConfigs(final AirbyteConfig configType, final Stream<T> configs, final Path rootOverride) {
configs.forEach(config -> {
final String configId = configType.getId(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -59,8 +60,25 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf

@Override
public <T> void writeConfig(final AirbyteConfig configType, final String configId, final T config) throws JsonValidationException, IOException {
validateJson(Jsons.jsonNode(config), configType);
decoratedPersistence.writeConfig(configType, configId, config);

final Map<String, T> configIdToConfig = new HashMap<>() {

{
put(configId, config);
}

};

writeConfigs(configType, configIdToConfig);
}

@Override
public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T> configs)
throws IOException, JsonValidationException {
for (final Map.Entry<String, T> config : configs.entrySet()) {
validateJson(Jsons.jsonNode(config.getValue()), configType);
}
decoratedPersistence.writeConfigs(configType, configs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.db.Database;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.Record1;
Expand Down Expand Up @@ -88,6 +90,13 @@ protected static void writeDestination(final ConfigPersistence configPersistence
configPersistence.writeConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(), destination);
}

protected static void writeDestinations(final ConfigPersistence configPersistence, final List<StandardDestinationDefinition> destinations)
throws Exception {
final Map<String, StandardDestinationDefinition> destinationsByID = destinations.stream()
.collect(Collectors.toMap(destinationDefinition -> destinationDefinition.getDestinationDefinitionId().toString(), Function.identity()));
configPersistence.writeConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationsByID);
}

protected static void deleteDestination(final ConfigPersistence configPersistence, final StandardDestinationDefinition destination)
throws Exception {
configPersistence.deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -32,6 +34,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ConfigRepositoryTest {

Expand All @@ -45,7 +49,7 @@ void setup() {
configPersistence = mock(ConfigPersistence.class);
final var secretPersistence = new MemorySecretPersistence();
configRepository =
new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence));
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence)));
}

@AfterEach
Expand Down Expand Up @@ -74,6 +78,35 @@ void assertReturnsWorkspace(final StandardWorkspace workspace) throws ConfigNotF
assertEquals(workspace, configRepository.getStandardWorkspace(WORKSPACE_ID, true));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWorkspaceByConnectionId(final boolean isTombstone) throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardWorkspace workspace = new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withTombstone(isTombstone);

final UUID connectionId = UUID.randomUUID();
final UUID sourceId = UUID.randomUUID();
final StandardSync mSync = new StandardSync()
.withSourceId(sourceId);
final SourceConnection mSourceConnection = new SourceConnection()
.withWorkspaceId(WORKSPACE_ID);
final StandardWorkspace mWorkflow = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID);

doReturn(mSync)
.when(configRepository)
.getStandardSync(connectionId);
doReturn(mSourceConnection)
.when(configRepository)
.getSourceConnection(sourceId);
doReturn(mWorkflow)
.when(configRepository)
.getStandardWorkspace(WORKSPACE_ID, isTombstone);

configRepository.getStandardWorkspaceFromConnection(connectionId, isTombstone);

verify(configRepository).getStandardWorkspace(WORKSPACE_ID, isTombstone);
}

@Test
void testGetConnectionState() throws Exception {
final UUID connectionId = UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static org.mockito.Mockito.spy;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
Expand Down Expand Up @@ -53,6 +54,17 @@ void tearDown() throws Exception {
database.close();
}

@Test
public void testMultiWriteAndGetConfig() throws Exception {
writeDestinations(configPersistence, Lists.newArrayList(DESTINATION_S3, DESTINATION_SNOWFLAKE));
assertRecordCount(2);
assertHasDestination(DESTINATION_S3);
assertHasDestination(DESTINATION_SNOWFLAKE);
assertEquals(
List.of(DESTINATION_SNOWFLAKE, DESTINATION_S3),
configPersistence.listConfigs(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class));
}

@Test
public void testWriteAndGetConfig() throws Exception {
writeDestination(configPersistence, DESTINATION_S3);
Expand All @@ -67,10 +79,10 @@ public void testWriteAndGetConfig() throws Exception {

@Test
public void testListConfigWithMetadata() throws Exception {
Instant now = Instant.now().minus(Duration.ofSeconds(1));
final Instant now = Instant.now().minus(Duration.ofSeconds(1));
writeDestination(configPersistence, DESTINATION_S3);
writeDestination(configPersistence, DESTINATION_SNOWFLAKE);
List<ConfigWithMetadata<StandardDestinationDefinition>> configWithMetadata = configPersistence
final List<ConfigWithMetadata<StandardDestinationDefinition>> configWithMetadata = configPersistence
.listConfigsWithMetadata(STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class);
assertEquals(2, configWithMetadata.size());
assertEquals("STANDARD_DESTINATION_DEFINITION", configWithMetadata.get(0).getConfigType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -58,6 +60,34 @@ void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNo
StandardSourceDefinition.class));
}

@Test
void testReadWriteConfigs() throws IOException, JsonValidationException, ConfigNotFoundException {
final Map<String, StandardSourceDefinition> sourceDefinitionById = new HashMap<>() {

{
put(UUID_1.toString(), SOURCE_1);
put(UUID_2.toString(), SOURCE_2);
}

};

configPersistence.writeConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionById);

assertEquals(
SOURCE_1,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_1.toString(),
StandardSourceDefinition.class));

assertEquals(
SOURCE_2,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_2.toString(),
StandardSourceDefinition.class));
}

@Test
void testListConfigs() throws JsonValidationException, IOException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1);
Expand Down
Loading

0 comments on commit 0d51742

Please sign in to comment.