diff --git a/airbyte-config/init/build.gradle b/airbyte-config/init/build.gradle index 92f170162b20..945051223ea9 100644 --- a/airbyte-config/init/build.gradle +++ b/airbyte-config/init/build.gradle @@ -7,42 +7,3 @@ dependencies { implementation project(':airbyte-config:models') } - -// generate seed for each yaml file. -task generateSeed { - def seeds = [ - [ - "sourceDefinitionId", - new File(project.projectDir, '/src/main/resources/seed/source_definitions.yaml'), - new File(project.projectDir, '/src/main/resources/config/STANDARD_SOURCE_DEFINITION') - ], - [ - "destinationDefinitionId", - new File(project.projectDir, '/src/main/resources/seed/destination_definitions.yaml'), - new File(project.projectDir, '/src/main/resources/config/STANDARD_DESTINATION_DEFINITION') - ], - ] - seeds.each{val -> - def name = val[0] - def taskName = "generateSeed$name" - dependsOn taskName - task "$taskName"(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - - main = 'io.airbyte.config.init.SeedRepository' - - // arguments to pass to the application - args '--id-name' - args val[0] - args '--input-path' - args val[1] - args '--output-path' - args val[2] - } - } -} - -// we only want to attempt generateSeed if tests have passed. -generateSeed.dependsOn(check) -generateSeed.dependsOn(assemble) -build.dependsOn(generateSeed) diff --git a/airbyte-config/init/scripts/copy_seed_data.sh b/airbyte-config/init/scripts/copy_seed_data.sh deleted file mode 100755 index fea2d5bb3350..000000000000 --- a/airbyte-config/init/scripts/copy_seed_data.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env sh - -# this is written as a script, despite its simplicity, because the yaml parser freaks out if $ -# is used for anything but variable interpolation. - -set -e - -# copy seed data over if the seed mount is empty. if it is not, we assume -# that it is already seeded and should not be overwritten. -[ "$(ls -A /seed )" ] || cp -r /app/seed/* /seed diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedRepository.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedRepository.java deleted file mode 100644 index 0955b56c911a..000000000000 --- a/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedRepository.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.config.init; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.config.helpers.YamlListToStandardDefinitions; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.stream.Collectors; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; - -/** - * This class takes in a yaml file with a list of objects. It then then assigns each object a uuid - * based on its name attribute. The uuid is written as a field in the object with the key specified - * as the id-name. It then writes each object to its own file in the specified output directory. - * Each file's name is the generated uuid. The goal is that a user should be able to add objects to - * the database seed without having to generate uuids themselves. The output files should be - * compatible with our file system database (config persistence). - */ -public class SeedRepository { - - private static final Options OPTIONS = new Options(); - private static final Option ID_NAME_OPTION = new Option("id", "id-name", true, "field name of the id"); - private static final Option INPUT_PATH_OPTION = new Option("i", "input-path", true, "path to input file"); - private static final Option OUTPUT_PATH_OPTION = new Option("o", "output-path", true, "path to where files will be output"); - - static { - ID_NAME_OPTION.setRequired(true); - INPUT_PATH_OPTION.setRequired(true); - OUTPUT_PATH_OPTION.setRequired(true); - OPTIONS.addOption(ID_NAME_OPTION); - OPTIONS.addOption(INPUT_PATH_OPTION); - OPTIONS.addOption(OUTPUT_PATH_OPTION); - } - - private static CommandLine parse(final String[] args) { - final CommandLineParser parser = new DefaultParser(); - final HelpFormatter helpFormatter = new HelpFormatter(); - - try { - return parser.parse(OPTIONS, args); - } catch (final ParseException e) { - helpFormatter.printHelp("", OPTIONS); - throw new IllegalArgumentException(e); - } - } - - public static void main(final String[] args) throws IOException { - final CommandLine parsed = parse(args); - final String idName = parsed.getOptionValue(ID_NAME_OPTION.getOpt()); - final Path inputPath = Path.of(parsed.getOptionValue(INPUT_PATH_OPTION.getOpt())); - final Path outputPath = Path.of(parsed.getOptionValue(OUTPUT_PATH_OPTION.getOpt())); - - new SeedRepository().run(idName, inputPath, outputPath); - } - - public void run(final String idName, final Path input, final Path output) throws IOException { - final var jsonNode = YamlListToStandardDefinitions.verifyAndConvertToJsonNode(idName, IOs.readFile(input)); - final var elementsIter = jsonNode.elements(); - - // clean output directory. - for (final Path file : Files.list(output).collect(Collectors.toList())) { - Files.delete(file); - } - - // write to output directory. - while (elementsIter.hasNext()) { - final JsonNode element = Jsons.clone(elementsIter.next()); - IOs.writeFile( - output, - element.get(idName).asText() + ".json", - Jsons.toPrettyString(element)); - } - } - -} diff --git a/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedType.java b/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedType.java new file mode 100644 index 000000000000..c223a92f7c49 --- /dev/null +++ b/airbyte-config/init/src/main/java/io/airbyte/config/init/SeedType.java @@ -0,0 +1,49 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.init; + +public enum SeedType { + + STANDARD_SOURCE_DEFINITION("/seed/source_definitions.yaml", "sourceDefinitionId"), + STANDARD_DESTINATION_DEFINITION("/seed/destination_definitions.yaml", "destinationDefinitionId"); + + final String resourcePath; + // ID field name + final String idName; + + SeedType(String resourcePath, String idName) { + this.resourcePath = resourcePath; + this.idName = idName; + } + + public String getResourcePath() { + return resourcePath; + } + + public String getIdName() { + return idName; + } + +} diff --git a/airbyte-config/init/src/test/java/io/airbyte/config/init/SeedRepositoryTest.java b/airbyte-config/init/src/test/java/io/airbyte/config/init/SeedRepositoryTest.java deleted file mode 100644 index 305d0c0ee455..000000000000 --- a/airbyte-config/init/src/test/java/io/airbyte/config/init/SeedRepositoryTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.config.init; - -import static org.junit.jupiter.api.Assertions.*; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.yaml.Yamls; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.UUID; -import java.util.stream.Collectors; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class SeedRepositoryTest { - - private static final String CONFIG_ID = "configId"; - private static final JsonNode OBJECT = Jsons.jsonNode(ImmutableMap.builder() - .put(CONFIG_ID, UUID.randomUUID()) - .put("name", "barker") - .put("description", "playwright") - .build()); - - private Path input; - private Path output; - - @BeforeEach - void setup() throws IOException { - input = Files.createTempDirectory("test_input").resolve("input.yaml"); - output = Files.createTempDirectory("test_output"); - - writeSeedList(OBJECT); - } - - @Test - void testWrite() throws IOException { - new SeedRepository().run(CONFIG_ID, input, output); - final JsonNode actual = Jsons.deserialize(IOs.readFile(output, OBJECT.get(CONFIG_ID).asText() + ".json")); - assertEquals(OBJECT, actual); - } - - @Test - void testOverwrites() throws IOException { - new SeedRepository().run(CONFIG_ID, input, output); - final JsonNode actual = Jsons.deserialize(IOs.readFile(output, OBJECT.get(CONFIG_ID).asText() + ".json")); - assertEquals(OBJECT, actual); - - final JsonNode clone = Jsons.clone(OBJECT); - ((ObjectNode) clone).put("description", "revolutionary"); - writeSeedList(clone); - - new SeedRepository().run(CONFIG_ID, input, output); - final JsonNode actualAfterOverwrite = Jsons.deserialize(IOs.readFile(output, OBJECT.get(CONFIG_ID).asText() + ".json")); - assertEquals(clone, actualAfterOverwrite); - } - - @Test - void testFailsOnDuplicateId() { - final JsonNode object = Jsons.clone(OBJECT); - ((ObjectNode) object).put("name", "howard"); - - writeSeedList(OBJECT, object); - final SeedRepository seedRepository = new SeedRepository(); - assertThrows(IllegalArgumentException.class, () -> seedRepository.run(CONFIG_ID, input, output)); - } - - @Test - void testFailsOnDuplicateName() { - final JsonNode object = Jsons.clone(OBJECT); - ((ObjectNode) object).put(CONFIG_ID, UUID.randomUUID().toString()); - - writeSeedList(OBJECT, object); - final SeedRepository seedRepository = new SeedRepository(); - assertThrows(IllegalArgumentException.class, () -> seedRepository.run(CONFIG_ID, input, output)); - } - - @Test - void testPristineOutputDir() throws IOException { - IOs.writeFile(output, "blah.json", "{}"); - assertEquals(1, Files.list(output).count()); - - new SeedRepository().run(CONFIG_ID, input, output); - - // verify the file that the file that was already in the directory is gone. - assertEquals(1, Files.list(output).count()); - assertEquals(OBJECT.get(CONFIG_ID).asText() + ".json", Files.list(output).collect(Collectors.toList()).get(0).getFileName().toString()); - } - - private void writeSeedList(JsonNode... seeds) { - final JsonNode seedList = Jsons.jsonNode(new ArrayList<>()); - for (JsonNode seed : seeds) { - ((ArrayNode) seedList).add(seed); - } - IOs.writeFile(input, Yamls.serialize(seedList)); - } - -} diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java index 499b41fe5f5a..2118c4929726 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistenceBuilder.java @@ -68,11 +68,13 @@ public static ConfigPersistence getDbPersistence(Configs configs) throws IOExcep * Otherwise, seed the database from the yaml files. */ ConfigPersistence create() throws IOException, InterruptedException { - // Uncomment this branch in a future version when config volume is removed. - // if (configs.getConfigRoot() == null) { - // return getDbPersistenceWithYamlSeed(); - // } - return getDbPersistenceWithFileSeed(); + if (FileSystemConfigPersistence.hasExistingConfigs(configs.getConfigRoot())) { + LOGGER.info("There is existing local config directory"); + return getDbPersistenceWithFileSeed(); + } else { + LOGGER.info("There is no existing local config directory"); + return getDbPersistenceWithYamlSeed(); + } } /** @@ -81,7 +83,7 @@ ConfigPersistence create() throws IOException, InterruptedException { */ ConfigPersistence getDbPersistenceWithYamlSeed() throws IOException { LOGGER.info("Creating db-based config persistence, and loading initial seed from YAML files"); - ConfigPersistence seedConfigPersistence = new YamlSeedConfigPersistence(); + ConfigPersistence seedConfigPersistence = YamlSeedConfigPersistence.get(); return getDbPersistence(seedConfigPersistence); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java index c5b039da60c8..2d9ceeb76d3e 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/FileSystemConfigPersistence.java @@ -25,6 +25,7 @@ package io.airbyte.config.persistence; import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.Preconditions; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteConfig; @@ -58,18 +59,25 @@ public class FileSystemConfigPersistence implements ConfigPersistence { // root for where configs are stored private final Path configRoot; - public static ConfigPersistence createWithValidation(final Path storageRoot) throws InterruptedException { - LOGGER.info("Constructing file system config persistence (root: {})", storageRoot); - - Path configRoot = storageRoot.resolve(CONFIG_DIR); - int totalWaitingSeconds = 0; - while (!Files.exists(configRoot)) { - LOGGER.warn("Config volume is not ready yet (waiting time: {} s)", totalWaitingSeconds); - Thread.sleep(INTERVAL_WAITING_SECONDS * 1000); - totalWaitingSeconds += INTERVAL_WAITING_SECONDS; + /** + * Check if there are existing configs under the storage root. Previously the seed container copies + * the configs to the storage root, it may take some time for the operation to complete and for the + * CONFIG_DIR to show up. So we cannot infer anything based on the existence of this directory. Now + * this seed generation step has been removed. So we can tell immediately whether CONFIG_DIR exists + * or not. If CONFIG_DIR exists, it means the user has just migrated Airbyte from an old version + * that uses this file system config persistence. + */ + public static boolean hasExistingConfigs(final Path storageRoot) { + if (!Files.exists(storageRoot)) { + throw new RuntimeException("Storage root does not exist: " + storageRoot); } - LOGGER.info("Config volume is ready (waiting time: {} s)", totalWaitingSeconds); + return Files.exists(storageRoot.resolve(CONFIG_DIR)); + } + public static ConfigPersistence createWithValidation(final Path storageRoot) { + LOGGER.info("Constructing file system config persistence (root: {})", storageRoot); + Path configRoot = storageRoot.resolve(CONFIG_DIR); + Preconditions.checkArgument(Files.exists(configRoot), "CONFIG_DIR does not exist under the storage root: %s", configRoot); return new ValidatingConfigPersistence(new FileSystemConfigPersistence(storageRoot)); } @@ -160,7 +168,7 @@ private List listConfig(String configType) throws IOException { } @Override - public void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException { + public void deleteConfig(AirbyteConfig configType, String configId) throws IOException { synchronized (lock) { deleteConfigInternal(configType, configId); } @@ -242,11 +250,11 @@ private void writeConfigInternal(AirbyteConfig configType, String configId, Files.writeString(configPath, Jsons.serialize(config)); } - private void deleteConfigInternal(AirbyteConfig configType, String configId) throws IOException { + private void deleteConfigInternal(AirbyteConfig configType, String configId) throws IOException { deleteConfigInternal(configType, configId, configRoot); } - private void deleteConfigInternal(AirbyteConfig configType, String configId, Path storageRoot) throws IOException { + private void deleteConfigInternal(AirbyteConfig configType, String configId, Path storageRoot) throws IOException { final Path configPath = buildConfigPath(configType, configId, storageRoot); Files.delete(configPath); } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java index ec8d9df6585f..581a0e646c30 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/YamlSeedConfigPersistence.java @@ -32,7 +32,7 @@ import io.airbyte.commons.yaml.Yamls; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; -import io.airbyte.config.init.SeedRepository; +import io.airbyte.config.init.SeedType; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URL; @@ -44,47 +44,44 @@ /** * This config persistence contains all seed definitions according to the yaml files. It is - * read-only. This class can eventually replace the generateSeed task and the file system config - * persistence. + * read-only. */ public class YamlSeedConfigPersistence implements ConfigPersistence { - private enum SeedConfigType { + private static final Map CONFIG_SCHEMA_MAP = Map.of( + ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedType.STANDARD_SOURCE_DEFINITION, + ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedType.STANDARD_DESTINATION_DEFINITION); - STANDARD_SOURCE_DEFINITION("/seed/source_definitions.yaml", "sourceDefinitionId"), - STANDARD_DESTINATION_DEFINITION("/seed/destination_definitions.yaml", "destinationDefinitionId"); - - final String resourcePath; - final String idName; - - SeedConfigType(String resourcePath, String idName) { - this.resourcePath = resourcePath; - this.idName = idName; + private static final YamlSeedConfigPersistence INSTANCE; + static { + try { + INSTANCE = new YamlSeedConfigPersistence(); + } catch (IOException e) { + throw new RuntimeException(e); } - } - private static final Map CONFIG_SCHEMA_MAP = Map.of( - ConfigSchema.STANDARD_SOURCE_DEFINITION, SeedConfigType.STANDARD_SOURCE_DEFINITION, - ConfigSchema.STANDARD_DESTINATION_DEFINITION, SeedConfigType.STANDARD_DESTINATION_DEFINITION); - // A mapping from seed config type to config UUID to config. - private final ImmutableMap> allSeedConfigs; + private final ImmutableMap> allSeedConfigs; - public YamlSeedConfigPersistence() throws IOException { - this.allSeedConfigs = ImmutableMap.>builder() - .put(SeedConfigType.STANDARD_SOURCE_DEFINITION, getConfigs(SeedConfigType.STANDARD_SOURCE_DEFINITION)) - .put(SeedConfigType.STANDARD_DESTINATION_DEFINITION, getConfigs(SeedConfigType.STANDARD_DESTINATION_DEFINITION)) + private YamlSeedConfigPersistence() throws IOException { + this.allSeedConfigs = ImmutableMap.>builder() + .put(SeedType.STANDARD_SOURCE_DEFINITION, getConfigs(SeedType.STANDARD_SOURCE_DEFINITION)) + .put(SeedType.STANDARD_DESTINATION_DEFINITION, getConfigs(SeedType.STANDARD_DESTINATION_DEFINITION)) .build(); } + public static YamlSeedConfigPersistence get() { + return INSTANCE; + } + @SuppressWarnings("UnstableApiUsage") - private static Map getConfigs(SeedConfigType seedConfigType) throws IOException { - final URL url = Resources.getResource(SeedRepository.class, seedConfigType.resourcePath); + private static Map getConfigs(SeedType seedType) throws IOException { + final URL url = Resources.getResource(SeedType.class, seedType.getResourcePath()); final String yamlString = Resources.toString(url, StandardCharsets.UTF_8); final JsonNode configList = Yamls.deserialize(yamlString); return MoreIterators.toList(configList.elements()).stream().collect(Collectors.toMap( - json -> json.get(seedConfigType.idName).asText(), + json -> json.get(seedType.getIdName()).asText(), json -> json)); } diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseTest.java index a2ab27c48605..a2d8a6e92c96 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseTest.java @@ -48,7 +48,7 @@ public abstract class BaseTest { static { try { - ConfigPersistence seedPersistence = new YamlSeedConfigPersistence(); + ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get(); SOURCE_GITHUB = seedPersistence .getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, "ef69ef6e-aa7f-4af1-a01d-ef775033524e", StandardSourceDefinition.class); SOURCE_POSTGRES = seedPersistence diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java index 639afd2dbc72..61521d8a5e75 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistenceBuilderTest.java @@ -103,7 +103,7 @@ void tearDown() throws Exception { @Test public void testCreateDbPersistenceWithYamlSeed() throws IOException { ConfigPersistence dbPersistence = new ConfigPersistenceBuilder(configs, true).getDbPersistenceWithYamlSeed(); - ConfigPersistence seedPersistence = new YamlSeedConfigPersistence(); + ConfigPersistence seedPersistence = YamlSeedConfigPersistence.get(); assertSameConfigDump(seedPersistence.dumpConfigs(), dbPersistence.dumpConfigs()); } @@ -141,7 +141,7 @@ public void testCreateDbPersistenceWithoutSetupDatabase() throws Exception { return null; }); - ConfigPersistence seedPersistence = spy(new YamlSeedConfigPersistence()); + ConfigPersistence seedPersistence = spy(YamlSeedConfigPersistence.get()); // When setupDatabase is false, the createDbPersistence method does not initialize // the database itself, but it expects that the database has already been initialized. ConfigPersistence dbPersistence = new ConfigPersistenceBuilder(configs, false).getDbPersistence(seedPersistence); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java index 3386bf8708a4..87fe31181840 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/YamlSeedConfigPersistenceTest.java @@ -34,7 +34,6 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardWorkspace; -import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.stream.Stream; @@ -42,15 +41,7 @@ public class YamlSeedConfigPersistenceTest { - private static final YamlSeedConfigPersistence PERSISTENCE; - - static { - try { - PERSISTENCE = new YamlSeedConfigPersistence(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + private static final YamlSeedConfigPersistence PERSISTENCE = YamlSeedConfigPersistence.get(); @Test public void testGetConfig() throws Exception { @@ -86,8 +77,8 @@ public void testGetInvalidConfig() { public void testDumpConfigs() { Map> allSeedConfigs = PERSISTENCE.dumpConfigs(); assertEquals(2, allSeedConfigs.size()); - assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_SOURCE_DEFINITION.name()).count() > 0); - assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_DESTINATION_DEFINITION.name()).count() > 0); + assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_SOURCE_DEFINITION.name()).findAny().isPresent()); + assertTrue(allSeedConfigs.get(ConfigSchema.STANDARD_DESTINATION_DEFINITION.name()).findAny().isPresent()); } @Test diff --git a/airbyte-db/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java b/airbyte-db/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java index b6827462776f..f4e732f80fdc 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java +++ b/airbyte-db/src/main/java/io/airbyte/db/instance/BaseDatabaseInstance.java @@ -71,6 +71,16 @@ protected BaseDatabaseInstance(String username, this.isDatabaseReady = isDatabaseReady; } + @Override + public boolean isInitialized() throws IOException { + Database database = Databases.createPostgresDatabaseWithRetry( + username, + password, + connectionString, + isDatabaseConnected(databaseName)); + return new ExceptionWrappingDatabase(database).transaction(ctx -> tableNames.stream().allMatch(tableName -> hasTable(ctx, tableName))); + } + @Override public Database getInitialized() { // When we don't need to setup the database, it means the database is initialized diff --git a/airbyte-db/src/main/java/io/airbyte/db/instance/DatabaseInstance.java b/airbyte-db/src/main/java/io/airbyte/db/instance/DatabaseInstance.java index 30c25cc15872..d45f20a52d23 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/instance/DatabaseInstance.java +++ b/airbyte-db/src/main/java/io/airbyte/db/instance/DatabaseInstance.java @@ -29,6 +29,11 @@ public interface DatabaseInstance { + /** + * Check is a database has been initialized. + */ + boolean isInitialized() throws IOException; + /** * Get a database that has been initialized and is ready to use. */ diff --git a/airbyte-server/Dockerfile b/airbyte-server/Dockerfile index 00cc657e2ba0..c4ebc3396590 100644 --- a/airbyte-server/Dockerfile +++ b/airbyte-server/Dockerfile @@ -8,9 +8,6 @@ WORKDIR /app COPY build/distributions/${APPLICATION}-0*.tar ${APPLICATION}.tar -RUN mkdir latest_seeds -COPY build/config_init/resources/main/config latest_seeds - RUN tar xf ${APPLICATION}.tar --strip-components=1 # wait for upstream dependencies to become available before starting server diff --git a/airbyte-server/seed.Dockerfile b/airbyte-server/seed.Dockerfile deleted file mode 100644 index 8295c1a251f9..000000000000 --- a/airbyte-server/seed.Dockerfile +++ /dev/null @@ -1,7 +0,0 @@ -FROM alpine:3.4 AS seed - -WORKDIR /app - -# the sole purpose of this image is to seed the data volume with the default data -# that the app should have when it is first installed. -COPY build/config_init/resources/main/config seed/config \ No newline at end of file diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java index 54a975e60821..f32a04540728 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java @@ -76,8 +76,8 @@ public File dump() { final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME); final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile(); exportVersionFile(tempFolder); - dumpConfigs(tempFolder); - dumpDatabase(tempFolder); + dumpConfigsDatabase(tempFolder); + dumpJobsDatabase(tempFolder); Archives.createArchive(tempFolder, dump.toPath()); return dump; @@ -92,7 +92,7 @@ private void exportVersionFile(Path tempFolder) throws IOException { FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset()); } - private void dumpDatabase(Path parentFolder) throws Exception { + private void dumpJobsDatabase(Path parentFolder) throws Exception { final Map> tables = jobPersistence.exportDatabase().entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().name(), Entry::getValue)); Files.createDirectories(parentFolder.resolve(DB_FOLDER_NAME)); @@ -116,7 +116,7 @@ protected static Path buildTablePath(final Path storageRoot, final String tableN .resolve(String.format("%s.yaml", tableName.toUpperCase())); } - private void dumpConfigs(Path parentFolder) throws IOException { + private void dumpConfigsDatabase(Path parentFolder) throws IOException { for (Map.Entry> configEntry : configRepository.dumpConfigs().entrySet()) { writeConfigsToArchive(parentFolder, configEntry.getKey(), configEntry.getValue()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java index d3bf46337dba..896f5da44b89 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigDumpImporter.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airbyte.analytics.TrackingClientSingleton; @@ -43,6 +42,7 @@ import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.scheduler.persistence.DefaultJobPersistence; @@ -55,14 +55,15 @@ import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.Spliterator; import java.util.Spliterators; import java.util.UUID; @@ -94,9 +95,7 @@ public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence post this.configRepository = configRepository; } - public ImportRead importDataWithSeed(String targetVersion, File archive, Path seedPath) { - Preconditions.checkNotNull(seedPath); - + public ImportRead importDataWithSeed(String targetVersion, File archive, ConfigPersistence seedPersistence) { ImportRead result; try { final Path sourceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive"); @@ -106,7 +105,7 @@ public ImportRead importDataWithSeed(String targetVersion, File archive, Path se // 2. dry run try { - checkImport(targetVersion, sourceRoot, seedPath); + checkImport(targetVersion, sourceRoot, seedPersistence); } catch (Exception e) { LOGGER.error("Dry run failed.", e); throw e; @@ -116,7 +115,7 @@ public ImportRead importDataWithSeed(String targetVersion, File archive, Path se importDatabaseFromArchive(sourceRoot, targetVersion); // 4. Import Configs - importConfigsFromArchive(sourceRoot, seedPath, false); + importConfigsFromArchive(sourceRoot, seedPersistence, false); // 5. Set DB version LOGGER.info("Setting the DB Airbyte version to : " + targetVersion); @@ -140,7 +139,7 @@ public ImportRead importDataWithSeed(String targetVersion, File archive, Path se return result; } - private void checkImport(String targetVersion, Path tempFolder, Path seed) throws IOException, JsonValidationException { + private void checkImport(String targetVersion, Path tempFolder, ConfigPersistence seedPersistence) throws IOException, JsonValidationException { final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME); final String importVersion = Files.readString(versionFile, Charset.defaultCharset()) .replace("\n", "").strip(); @@ -151,7 +150,7 @@ private void checkImport(String targetVersion, Path tempFolder, Path seed) throw "Please upgrade your Airbyte Archive, see more at https://docs.airbyte.io/tutorials/upgrading-airbyte\n", importVersion, targetVersion)); } - importConfigsFromArchive(tempFolder, seed, true); + importConfigsFromArchive(tempFolder, seedPersistence, true); } // Config @@ -162,10 +161,10 @@ private List listDirectories(Path sourceRoot) throws IOException { } } - private void importConfigsFromArchive(final Path sourceRoot, Path seedPath, final boolean dryRun) + private void importConfigsFromArchive(final Path sourceRoot, ConfigPersistence seedPersistence, final boolean dryRun) throws IOException, JsonValidationException { - final List sourceDefinitionsToMigrate = new ArrayList<>(); - final List destinationDefinitionsToMigrate = new ArrayList<>(); + final Set sourceDefinitionsInUse = new HashSet<>(); + final Set destinationDefinitionsInUse = new HashSet<>(); final boolean[] sourceProcessed = {false}; final boolean[] destinationProcessed = {false}; final List directories = listDirectories(sourceRoot); @@ -175,7 +174,7 @@ private void importConfigsFromArchive(final Path sourceRoot, Path seedPath, Collections.sort(directories); final Map> data = new LinkedHashMap<>(); - final Map> seed = getSeed(seedPath); + final Map> seeds = getSeeds(seedPersistence); for (final String directory : directories) { final Optional configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class); @@ -186,57 +185,47 @@ private void importConfigsFromArchive(final Path sourceRoot, Path seedPath, final ConfigSchema configSchema = configSchemaOptional.get(); Stream configs = readConfigsFromArchive(sourceRoot, configSchema); + + // If there is no source or destination connection, mark them as processed respectively. + if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION && !data.containsKey(ConfigSchema.SOURCE_CONNECTION)) { + sourceProcessed[0] = true; + } else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION && !data.containsKey(ConfigSchema.DESTINATION_CONNECTION)) { + destinationProcessed[0] = true; + } + configs = streamWithAdditionalOperation( - sourceDefinitionsToMigrate, - destinationDefinitionsToMigrate, + sourceDefinitionsInUse, + destinationDefinitionsInUse, sourceProcessed, destinationProcessed, configSchema, configs, - seed); + seeds); data.put(configSchema, configs); } configRepository.replaceAllConfigs(data, dryRun); } - private Map> getSeed(Path seed) throws IOException { - final List configSchemas = Files.list(seed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList()); - final Map> allData = new HashMap<>(); - for (ConfigSchema configSchema : configSchemas) { - final Map data = readLatestSeed(seed, configSchema); - allData.put(configSchema, data); + /** + * Convert config dumps from {@link ConfigPersistence#dumpConfigs} to the desired format. + */ + @SuppressWarnings("unchecked") + private static Map> getSeeds(ConfigPersistence configSeedPersistence) throws IOException { + Map> allData = new HashMap<>(2); + for (Map.Entry> configStream : configSeedPersistence.dumpConfigs().entrySet()) { + ConfigSchema configSchema = ConfigSchema.valueOf(configStream.getKey()); + Map configSeeds = configStream.getValue() + .map(node -> Jsons.object(node, configSchema.getClassName())) + .collect(Collectors.toMap( + configSchema::getId, + object -> (T) object)); + allData.put(configSchema, configSeeds); } return allData; } - private Map readLatestSeed(Path latestSeed, ConfigSchema configSchema) throws IOException { - try (Stream files = Files.list(latestSeed.resolve(configSchema.toString()))) { - final List ids = files - .filter(p -> !p.endsWith(".json")) - .map(p -> p.getFileName().toString().replace(".json", "")) - .collect(Collectors.toList()); - - final Map configData = new HashMap<>(); - for (String id : ids) { - try { - final Path configPath = latestSeed.resolve(configSchema.toString()).resolve(String.format("%s.json", id)); - if (!Files.exists(configPath)) { - throw new RuntimeException("Config NotFound"); - } - - T config = Jsons.deserialize(Files.readString(configPath), configSchema.getClassName()); - configData.put(id, config); - } catch (RuntimeException e) { - throw new IOException(e); - } - } - - return configData; - } - } - - private Stream streamWithAdditionalOperation(List sourceDefinitionsToMigrate, - List destinationDefinitionsToMigrate, + private Stream streamWithAdditionalOperation(Set sourceDefinitionsInUse, + Set destinationDefinitionsInUse, boolean[] sourceProcessed, boolean[] destinationProcessed, ConfigSchema configSchema, @@ -244,35 +233,39 @@ private Stream streamWithAdditionalOperation(List sourceDefinitio Map> latestSeeds) { if (configSchema == ConfigSchema.SOURCE_CONNECTION) { sourceProcessed[0] = true; - configs = configs.peek(config -> sourceDefinitionsToMigrate.add(((SourceConnection) config).getSourceDefinitionId().toString())); + configs = configs.peek(config -> sourceDefinitionsInUse.add(((SourceConnection) config).getSourceDefinitionId().toString())); } else if (configSchema == ConfigSchema.DESTINATION_CONNECTION) { destinationProcessed[0] = true; - configs = configs.peek(config -> destinationDefinitionsToMigrate.add(((DestinationConnection) config).getDestinationDefinitionId().toString())); + configs = configs.peek(config -> destinationDefinitionsInUse.add(((DestinationConnection) config).getDestinationDefinitionId().toString())); } else if (configSchema == ConfigSchema.STANDARD_SOURCE_DEFINITION) { - configs = getDefinitionStream(sourceDefinitionsToMigrate, sourceProcessed[0], configSchema, configs, latestSeeds); + Map sourceDefinitionSeeds = latestSeeds.get(configSchema); + configs = getDefinitionStream(sourceDefinitionsInUse, sourceProcessed[0], configSchema, configs, sourceDefinitionSeeds); } else if (configSchema == ConfigSchema.STANDARD_DESTINATION_DEFINITION) { - configs = getDefinitionStream(destinationDefinitionsToMigrate, destinationProcessed[0], configSchema, configs, latestSeeds); + Map destinationDefinitionSeeds = latestSeeds.get(configSchema); + configs = getDefinitionStream(destinationDefinitionsInUse, destinationProcessed[0], configSchema, configs, destinationDefinitionSeeds); } return configs; } /** - * This method combines latest definitions, with existing definition. If a connector is being used - * by user, it will continue to be at the same version, otherwise it will be migrated to the latest + * This method combines the latest definitions with existing ones. If a connector is being used by + * user, it will continue to be at the same version, otherwise it will be migrated to the latest * version */ - private Stream getDefinitionStream(List definitionsToMigrate, + private Stream getDefinitionStream(Set definitionsInUse, boolean definitionsPopulated, ConfigSchema configSchema, - Stream configs, - Map> latestSeeds) { + Stream currentDefinitions, + Map latestDefinitions) { if (!definitionsPopulated) { - throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions to migrate"); + throw new RuntimeException("Trying to process " + configSchema + " without populating the definitions in use"); } - return Streams.concat(configs.filter(c -> definitionsToMigrate.contains(configSchema.getId(c))), - latestSeeds.getOrDefault(configSchema, new HashMap<>()).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey())) - .map(Entry::getValue)); + return Streams.concat( + // Keep all the definitions in use + currentDefinitions.filter(c -> definitionsInUse.contains(configSchema.getId(c))), + // Upgrade all the definitions not in use + latestDefinitions.entrySet().stream().filter(c -> !definitionsInUse.contains(c.getKey())).map(Entry::getValue)); } private Stream readConfigsFromArchive(final Path storageRoot, final ConfigSchema schemaType) diff --git a/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java b/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java index 6058bb818612..06f6b5f369d5 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java +++ b/airbyte-server/src/main/java/io/airbyte/server/RunMigration.java @@ -26,6 +26,7 @@ import io.airbyte.api.model.ImportRead; import io.airbyte.api.model.ImportRead.StatusEnum; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.migrate.MigrateConfig; import io.airbyte.migrate.MigrationRunner; @@ -44,7 +45,7 @@ public class RunMigration implements Runnable, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class); private final String targetVersion; - private final Path seedPath; + private final ConfigPersistence seedPersistence; private final ConfigDumpExporter configDumpExporter; private final ConfigDumpImporter configDumpImporter; private final List filesToBeCleanedUp = new ArrayList<>(); @@ -52,9 +53,9 @@ public class RunMigration implements Runnable, AutoCloseable { public RunMigration(JobPersistence jobPersistence, ConfigRepository configRepository, String targetVersion, - Path seedPath) { + ConfigPersistence seedPersistence) { this.targetVersion = targetVersion; - this.seedPath = seedPath; + this.seedPersistence = seedPersistence; this.configDumpExporter = new ConfigDumpExporter(configRepository, jobPersistence); this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence); } @@ -77,7 +78,7 @@ public void run() { MigrationRunner.run(migrateConfig); // Import data - ImportRead importRead = configDumpImporter.importDataWithSeed(targetVersion, output, seedPath); + ImportRead importRead = configDumpImporter.importDataWithSeed(targetVersion, output, seedPersistence); if (importRead.getStatus() == StatusEnum.FAILED) { throw new RuntimeException("Automatic migration failed : " + importRead.getReason()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 5544038ccd55..c868da34ffc2 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -36,6 +36,7 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigPersistenceBuilder; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.scheduler.client.DefaultSchedulerJobClient; @@ -52,7 +53,6 @@ import io.airbyte.server.errors.KnownExceptionMapper; import io.airbyte.server.errors.NotFoundExceptionMapper; import io.airbyte.server.errors.UncaughtExceptionMapper; -import io.airbyte.server.handlers.ArchiveHandler; import io.airbyte.server.version_mismatch.VersionMismatchServer; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.TemporalClient; @@ -249,12 +249,11 @@ private static void runAutomaticMigration(ConfigRepository configRepository, String airbyteVersion, String airbyteDatabaseVersion) { LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion); - LOGGER.info("Last seeds dir: {}", ArchiveHandler.SEEDS_PATH); try (final RunMigration runMigration = new RunMigration( jobPersistence, configRepository, airbyteVersion, - ArchiveHandler.SEEDS_PATH)) { + YamlSeedConfigPersistence.get())) { runMigration.run(); } catch (Exception e) { LOGGER.error("Automatic Migration failed ", e); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java index 6a291ad69fc1..47cbc95595b7 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ArchiveHandler.java @@ -29,6 +29,7 @@ import io.airbyte.api.model.ImportRead.StatusEnum; import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.ConfigDumpExporter; import io.airbyte.server.ConfigDumpImporter; @@ -43,10 +44,7 @@ public class ArchiveHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHandler.class); - public static final Path SEEDS_PATH = Path.of(System.getProperty("user.dir")).resolve("latest_seeds"); - private final String version; - private final ConfigRepository configRepository; private final ConfigDumpExporter configDumpExporter; private final ConfigDumpImporter configDumpImporter; private final FileTtlManager fileTtlManager; @@ -57,7 +55,6 @@ public ArchiveHandler(final String version, final FileTtlManager fileTtlManager) { this( version, - configRepository, fileTtlManager, new ConfigDumpExporter(configRepository, jobPersistence), new ConfigDumpImporter(configRepository, jobPersistence)); @@ -65,12 +62,10 @@ public ArchiveHandler(final String version, @VisibleForTesting ArchiveHandler(final String version, - final ConfigRepository configRepository, final FileTtlManager fileTtlManager, final ConfigDumpExporter configDumpExporter, final ConfigDumpImporter configDumpImporter) { this.version = version; - this.configRepository = configRepository; this.configDumpExporter = configDumpExporter; this.configDumpImporter = configDumpImporter; this.fileTtlManager = fileTtlManager; @@ -98,7 +93,7 @@ public ImportRead importData(File archive) { try { final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive"); try { - configDumpImporter.importDataWithSeed(version, archive, SEEDS_PATH); + configDumpImporter.importDataWithSeed(version, archive, YamlSeedConfigPersistence.get()); result = new ImportRead().status(StatusEnum.SUCCEEDED); } finally { FileUtils.deleteDirectory(tempFolder.toFile()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index a7a6f9a4d9cc..139a335ce4b1 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -25,69 +25,164 @@ package io.airbyte.server.handlers; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.api.model.ImportRead; import io.airbyte.api.model.ImportRead.StatusEnum; import io.airbyte.commons.io.FileTtlManager; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.server.ConfigDumpExporter; -import io.airbyte.server.ConfigDumpImporter; +import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.YamlSeedConfigPersistence; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; +import io.airbyte.db.instance.jobs.JobsDatabaseInstance; +import io.airbyte.scheduler.persistence.DefaultJobPersistence; +import io.airbyte.scheduler.persistence.JobPersistence; import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; public class ArchiveHandlerTest { - private static final String VERSION = "test-version"; + private static final String VERSION = "0.6.8"; + private static PostgreSQLContainer container; + + private Database database; + private JobPersistence jobPersistence; + private ConfigPersistence configPersistence; + private ConfigPersistence seedPersistence; private ConfigRepository configRepository; private ArchiveHandler archiveHandler; - private FileTtlManager fileTtlManager; - private ConfigDumpExporter configDumpExporter; - private ConfigDumpImporter configDumpImporter; + + private static class NoOpFileTtlManager extends FileTtlManager { + + public NoOpFileTtlManager() { + super(1L, TimeUnit.MINUTES, 1L); + } + + public void register(Path path) {} + + } + + @BeforeAll + public static void dbSetup() { + container = new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName("airbyte") + .withUsername("docker") + .withPassword("docker"); + container.start(); + } + + @AfterAll + public static void dbDown() { + container.close(); + } @BeforeEach - void setUp() { - configRepository = mock(ConfigRepository.class); - fileTtlManager = mock(FileTtlManager.class); - configDumpExporter = mock(ConfigDumpExporter.class); - configDumpImporter = mock(ConfigDumpImporter.class); + public void setup() throws Exception { + database = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + jobPersistence = new DefaultJobPersistence(database); + database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); + seedPersistence = YamlSeedConfigPersistence.get(); + configPersistence = new DatabaseConfigPersistence(database).loadData(seedPersistence); + configRepository = new ConfigRepository(configPersistence); + + jobPersistence.setVersion(VERSION); + archiveHandler = new ArchiveHandler( VERSION, configRepository, - fileTtlManager, - configDumpExporter, - configDumpImporter); + jobPersistence, + new NoOpFileTtlManager()); + } + + @AfterEach + void tearDown() throws Exception { + database.close(); } + /** + * After exporting and importing, the configs should remain the same. + */ @Test - void testExport() throws IOException { - final File file = Files.createTempFile(Path.of("/tmp"), "dump_file", "dump_file").toFile(); - when(configDumpExporter.dump()).thenReturn(file); + void testRoundTrip() throws Exception { + assertSameConfigDump(seedPersistence.dumpConfigs(), configRepository.dumpConfigs()); - assertEquals(file, archiveHandler.exportData()); + // Export the configs. + File archive = archiveHandler.exportData(); - verify(configDumpExporter).dump(); - verify(fileTtlManager).register(file.toPath()); - } + // After deleting the configs, the dump becomes empty. + configPersistence.replaceAllConfigs(Collections.emptyMap(), false); + assertSameConfigDump(Collections.emptyMap(), configRepository.dumpConfigs()); - @Test - void testImport() throws IOException { - final File file = Files.createTempFile(Path.of("/tmp"), "dump_file", "dump_file").toFile(); + // After importing the configs, the dump is restored. + ImportRead importResult = archiveHandler.importData(archive); + assertEquals(StatusEnum.SUCCEEDED, importResult.getStatus()); + assertSameConfigDump(seedPersistence.dumpConfigs(), configRepository.dumpConfigs()); - assertEquals(new ImportRead().status(StatusEnum.SUCCEEDED), archiveHandler.importData(file)); + // When a connector definition is in use, it will not be updated. + UUID sourceS3DefinitionId = UUID.fromString("69589781-7828-43c5-9f63-8925b1c1ccc2"); + String sourceS3DefinitionVersion = "0.0.0"; + StandardSourceDefinition sourceS3Definition = seedPersistence.getConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + sourceS3DefinitionId.toString(), + StandardSourceDefinition.class) + // This source definition is on an old version + .withDockerImageTag(sourceS3DefinitionVersion); + SourceConnection sourceConnection = new SourceConnection() + .withSourceDefinitionId(sourceS3DefinitionId) + .withSourceId(UUID.randomUUID()) + .withWorkspaceId(UUID.randomUUID()) + .withName("Test source") + .withConfiguration(Jsons.deserialize("{}")) + .withTombstone(false); - // make sure it cleans up the file. - assertFalse(Files.exists(file.toPath())); + // Write source connection and an old source definition. + configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnection.getSourceId().toString(), sourceConnection); + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceS3DefinitionId.toString(), sourceS3Definition); + + // Export, wipe, and import the configs. + archive = archiveHandler.exportData(); + configPersistence.replaceAllConfigs(Collections.emptyMap(), false); + archiveHandler.importData(archive); + + // The version has not changed. + StandardSourceDefinition actualS3Definition = configPersistence.getConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + sourceS3DefinitionId.toString(), + StandardSourceDefinition.class); + assertEquals(sourceS3DefinitionVersion, actualS3Definition.getDockerImageTag()); + } + + private Map> getSetFromStream(Map> input) { + return input.entrySet().stream().collect(Collectors.toMap( + Entry::getKey, + e -> e.getValue().collect(Collectors.toSet()))); + } - verify(configDumpImporter).importDataWithSeed(VERSION, file, ArchiveHandler.SEEDS_PATH); + // assertEquals cannot correctly check the equality of two maps with stream values, + // so streams are converted to sets before being compared. + private void assertSameConfigDump(Map> expected, Map> actual) { + assertEquals(getSetFromStream(expected), getSetFromStream(actual)); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index f9a64cb79ee0..f97170b850fd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -45,6 +45,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.FileSystemConfigPersistence; +import io.airbyte.config.persistence.YamlSeedConfigPersistence; import io.airbyte.db.Database; import io.airbyte.migrate.Migrations; import io.airbyte.scheduler.persistence.DefaultJobPersistence; @@ -304,7 +305,7 @@ private void runMigration(JobPersistence jobPersistence, Path configRoot) throws jobPersistence, new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)), TARGET_VERSION, - Path.of(System.getProperty("user.dir")).resolve("build/config_init/resources/main/config"))) { + YamlSeedConfigPersistence.get())) { runMigration.run(); } } diff --git a/build.gradle b/build.gradle index 0a580f304f61..3324a6ee78c7 100644 --- a/build.gradle +++ b/build.gradle @@ -247,7 +247,6 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" } task('generate') { - dependsOn subprojects.collect { it.getTasksByName('generateSeed', true) } dependsOn subprojects.collect { it.getTasksByName('generateProtocolClassFiles', true) } dependsOn subprojects.collect { it.getTasksByName('generateJsonSchema2Pojo', true) } } diff --git a/docker-compose.build.yaml b/docker-compose.build.yaml index 646fad68e773..d62075306e97 100644 --- a/docker-compose.build.yaml +++ b/docker-compose.build.yaml @@ -15,13 +15,6 @@ services: context: airbyte-db labels: io.airbyte.git-revision: ${GIT_REVISION} - seed: - image: airbyte/seed:${VERSION} - build: - dockerfile: seed.Dockerfile - context: airbyte-server - labels: - io.airbyte.git-revision: ${GIT_REVISION} scheduler: image: airbyte/scheduler:${VERSION} build: diff --git a/docker-compose.yaml b/docker-compose.yaml index c61f20110cf0..5489724c823c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -28,13 +28,6 @@ services: - POSTGRES_PASSWORD=${DATABASE_PASSWORD} volumes: - db:/var/lib/postgresql/data - seed: - image: airbyte/seed:${VERSION} - container_name: airbyte-data-seed - # Pre-populate the volume if it is empty. - # See: https://docs.docker.com/storage/volumes/#populate-a-volume-using-a-container - volumes: - - data:/app/seed scheduler: image: airbyte/scheduler:${VERSION} logging: *default-logging diff --git a/tools/bin/acceptance_test_kube.sh b/tools/bin/acceptance_test_kube.sh index 29aff3d03237..fdf4112816fa 100755 --- a/tools/bin/acceptance_test_kube.sh +++ b/tools/bin/acceptance_test_kube.sh @@ -12,7 +12,6 @@ echo "Loading images into KIND..." kind load docker-image airbyte/server:dev --name chart-testing kind load docker-image airbyte/scheduler:dev --name chart-testing kind load docker-image airbyte/webapp:dev --name chart-testing -kind load docker-image airbyte/seed:dev --name chart-testing kind load docker-image airbyte/db:dev --name chart-testing echo "Starting app..." diff --git a/tools/bin/check_images_exist.sh b/tools/bin/check_images_exist.sh index 36c627b832cc..a8e62751aa02 100755 --- a/tools/bin/check_images_exist.sh +++ b/tools/bin/check_images_exist.sh @@ -19,21 +19,18 @@ checkPlatformImages() { checkConnectorImages() { echo "Checking connector images exist..." - CONFIG_FILES=$(find airbyte-config/init | grep json | grep -v STANDARD_WORKSPACE | grep -v build) - [ -z "$CONFIG_FILES" ] && echo "ERROR: Could not find any config files." && exit 1 - - while IFS= read -r file; do - REPO=$(jq -r .dockerRepository < "$file") - TAG=$(jq -r .dockerImageTag < "$file") - echo "Checking $file..." - printf "\tREPO: %s\n" "$REPO" - printf "\tTAG: %s\n" "$TAG" + CONNECTOR_DEFINITIONS=$(grep "dockerRepository" -h -A1 airbyte-config/init/src/main/resources/seed/*.yaml | grep -v -- "^--$" | tr -d ' ') + [ -z "CONNECTOR_DEFINITIONS" ] && echo "ERROR: Could not find any connector definition." && exit 1 + + while IFS=":" read -r _ REPO; do + IFS=":" read -r _ TAG + printf "${REPO}: ${TAG}\n" if docker_tag_exists "$REPO" "$TAG"; then printf "\tSTATUS: found\n" else printf "\tERROR: not found!\n" && exit 1 fi - done <<< "$CONFIG_FILES" + done <<< "${CONNECTOR_DEFINITIONS}" echo "Success! All connector images exist!" }