Skip to content

Commit

Permalink
refactor import / export endpoints to use the same code path as auto …
Browse files Browse the repository at this point in the history
…migration (airbytehq#4976)
  • Loading branch information
cgardens authored Jul 26, 2021
1 parent 979545b commit 7fbf15c
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 461 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.ConfigSchema;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static ConfigPersistence createWithValidation(final Path storageRoot) thr

public FileSystemConfigPersistence(final Path storageRoot) {
this.storageRoot = storageRoot;
this.configRoot = storageRoot.resolve(CONFIG_DIR);
this.configRoot = Exceptions.toRuntime(() -> Files.createDirectories(storageRoot.resolve(CONFIG_DIR)));
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ task copySeed(type: Copy, dependsOn: [project(':airbyte-config:init').processRes
}

// need to make sure that the files are in the resource directory before copying.
//project.tasks.copySeed.mustRunAfter(project(':airbyte-config:init').tasks.processResources)
// tests require the seed to exist.
test.dependsOn(project.tasks.copySeed)
assemble.dependsOn(project.tasks.copySeed)

mainClassName = 'io.airbyte.server.ServerApp'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,18 @@
* would fail. 2. Unlike ArchiveHandler, this doesn't take the dump of specific files but looks at
* the config directory and takes the full dump of whatever is available
*/
public class ConfigDumpExport {
public class ConfigDumpExporter {

private static final String ARCHIVE_FILE_NAME = "airbyte_config_dump";
private static final String CONFIG_FOLDER_NAME = "airbyte_config";
private static final String DB_FOLDER_NAME = "airbyte_db";
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final String version;

public ConfigDumpExport(ConfigRepository configRepository, JobPersistence jobPersistence, String version) {
public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.version = version;
}

public File dump() {
Expand All @@ -89,6 +87,7 @@ public File dump() {
}

private void exportVersionFile(Path tempFolder) throws IOException {
final String version = jobPersistence.getVersion().orElseThrow();
final File versionFile = Files.createFile(tempFolder.resolve(VERSION_FILE_NAME)).toFile();
FileUtils.writeStringToFile(versionFile, version, Charset.defaultCharset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.model.ImportRead;
import io.airbyte.api.model.ImportRead.StatusEnum;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.Archives;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -71,28 +73,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigDumpImport {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class ConfigDumpImporter {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImport.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDumpImporter.class);
private static final String CONFIG_FOLDER_NAME = "airbyte_config";
private static final String DB_FOLDER_NAME = "airbyte_db";
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence postgresPersistence;
private final String targetVersion;
private final String initialVersion;
private final Path latestSeed;

public ConfigDumpImport(String initialVersion,
String targetVersion,
Path latestSeed,
JobPersistence postgresPersistence,
ConfigRepository configRepository) {
this.targetVersion = targetVersion;
this.initialVersion = initialVersion;
this.latestSeed = latestSeed;
this.jsonSchemaValidator = new JsonSchemaValidator();

public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence) {
this(configRepository, postgresPersistence, new JsonSchemaValidator());
}

@VisibleForTesting
public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence postgresPersistence, JsonSchemaValidator jsonSchemaValidator) {
this.jsonSchemaValidator = jsonSchemaValidator;
this.postgresPersistence = postgresPersistence;
this.configRepository = configRepository;
}
Expand All @@ -109,7 +107,17 @@ public Optional<UUID> getCurrentCustomerId() {
}
}

public ImportRead importData(File archive) {
public ImportRead importData(String targetVersion, File archive) {
return importDataInternal(targetVersion, archive, Optional.empty());
}

public ImportRead importDataWithSeed(String targetVersion, File archive, Path seedPath) {
return importDataInternal(targetVersion, archive, Optional.of(seedPath));
}

// seedPath - if present, merge with the import. otherwise just use the data in the import.
private ImportRead importDataInternal(String targetVersion, File archive, Optional<Path> seedPath) {
Preconditions.checkNotNull(seedPath);

final Optional<UUID> previousCustomerIdOptional = getCurrentCustomerId();
ImportRead result;
Expand All @@ -121,18 +129,17 @@ public ImportRead importData(File archive) {

// 2. dry run
try {
checkImport(sourceRoot);
checkImport(targetVersion, sourceRoot, seedPath);
} catch (Exception e) {
LOGGER.warn("Dry run failed, setting DB version back to initial version");
postgresPersistence.setVersion(initialVersion);
LOGGER.error("Dry run failed.", e);
throw e;
}

// 3. Import Postgres content
importDatabaseFromArchive(sourceRoot, targetVersion);

// 4. Import Configs
importConfigsFromArchive(sourceRoot, false);
importConfigsFromArchive(sourceRoot, seedPath, false);

// 5. Set DB version
LOGGER.info("Setting the DB Airbyte version to : " + targetVersion);
Expand All @@ -158,7 +165,7 @@ public ImportRead importData(File archive) {
return result;
}

private void checkImport(Path tempFolder) throws IOException, JsonValidationException {
private void checkImport(String targetVersion, Path tempFolder, Optional<Path> seed) throws IOException, JsonValidationException {
final Path versionFile = tempFolder.resolve(VERSION_FILE_NAME);
final String importVersion = Files.readString(versionFile, Charset.defaultCharset())
.replace("\n", "").strip();
Expand All @@ -169,7 +176,7 @@ private void checkImport(Path tempFolder) throws IOException, JsonValidationExce
"Please upgrade your Airbyte Archive, see more at https://docs.airbyte.io/tutorials/upgrading-airbyte\n",
importVersion, targetVersion));
}
importConfigsFromArchive(tempFolder, true);
importConfigsFromArchive(tempFolder, seed, true);
}

// Config
Expand All @@ -180,39 +187,58 @@ private List<String> listDirectories(Path sourceRoot) throws IOException {
}
}

private <T> void importConfigsFromArchive(final Path sourceRoot, final boolean dryRun) throws IOException, JsonValidationException {
List<String> sourceDefinitionsToMigrate = new ArrayList<>();
List<String> destinationDefinitionsToMigrate = new ArrayList<>();
private <T> void importConfigsFromArchive(final Path sourceRoot, Optional<Path> seedPath, final boolean dryRun)
throws IOException, JsonValidationException {
final List<String> sourceDefinitionsToMigrate = new ArrayList<>();
final List<String> destinationDefinitionsToMigrate = new ArrayList<>();
final boolean[] sourceProcessed = {false};
final boolean[] destinationProcessed = {false};
List<String> directories = listDirectories(sourceRoot);
final List<String> directories = listDirectories(sourceRoot);
// We sort the directories cause we want to process SOURCE_CONNECTION before
// STANDARD_SOURCE_DEFINITION and DESTINATION_CONNECTION before STANDARD_DESTINATION_DEFINITION
// so that we can identify which definitions should not be upgraded to the latest version
Collections.sort(directories);
Map<ConfigSchema, Stream<T>> data = new LinkedHashMap<>();
Map<ConfigSchema, Map<String, T>> latestSeeds = latestSeeds();
final Map<ConfigSchema, Stream<T>> data = new LinkedHashMap<>();

final Map<ConfigSchema, Map<String, T>> seed;
if (seedPath.isPresent()) {
seed = getSeed(seedPath.get());
} else {
seed = new HashMap<>();
}
for (String directory : directories) {
ConfigSchema configSchema = ConfigSchema.valueOf(directory.replace(".yaml", ""));
final Optional<ConfigSchema> configSchemaOptional = Enums.toEnum(directory.replace(".yaml", ""), ConfigSchema.class);

if (configSchemaOptional.isEmpty()) {
continue;
}

final ConfigSchema configSchema = configSchemaOptional.get();
Stream<T> configs = readConfigsFromArchive(sourceRoot, configSchema);
configs = streamWithAdditionalOperation(sourceDefinitionsToMigrate, destinationDefinitionsToMigrate, sourceProcessed, destinationProcessed,
configSchema, configs, latestSeeds);
configs = streamWithAdditionalOperation(
sourceDefinitionsToMigrate,
destinationDefinitionsToMigrate,
sourceProcessed,
destinationProcessed,
configSchema,
configs,
seed);
data.put(configSchema, configs);
}
configRepository.replaceAllConfigs(data, dryRun);
}

private <T> Map<ConfigSchema, Map<String, T>> latestSeeds() throws IOException {
List<ConfigSchema> configSchemas = Files.list(latestSeed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList());
Map<ConfigSchema, Map<String, T>> allData = new HashMap<>();
private <T> Map<ConfigSchema, Map<String, T>> getSeed(Path seed) throws IOException {
final List<ConfigSchema> configSchemas = Files.list(seed).map(c -> ConfigSchema.valueOf(c.getFileName().toString())).collect(Collectors.toList());
final Map<ConfigSchema, Map<String, T>> allData = new HashMap<>();
for (ConfigSchema configSchema : configSchemas) {
Map<String, T> data = readLatestSeed(configSchema);
final Map<String, T> data = readLatestSeed(seed, configSchema);
allData.put(configSchema, data);
}
return allData;
}

private <T> Map<String, T> readLatestSeed(ConfigSchema configSchema) throws IOException {
private <T> Map<String, T> readLatestSeed(Path latestSeed, ConfigSchema configSchema) throws IOException {
try (Stream<Path> files = Files.list(latestSeed.resolve(configSchema.toString()))) {
final List<String> ids = files
.filter(p -> !p.endsWith(".json"))
Expand Down Expand Up @@ -274,7 +300,7 @@ private <T> Stream<T> getDefinitionStream(List<String> definitionsToMigrate,
}

return Streams.concat(configs.filter(c -> definitionsToMigrate.contains(configSchema.getId(c))),
latestSeeds.get(configSchema).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey()))
latestSeeds.getOrDefault(configSchema, new HashMap<>()).entrySet().stream().filter(c -> !definitionsToMigrate.contains(c.getKey()))
.map(Entry::getValue));
}

Expand Down Expand Up @@ -314,8 +340,7 @@ protected static Path buildConfigPath(final Path storageRoot, final ConfigSchema
}

// Postgres Portion
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion)
throws IOException {
public void importDatabaseFromArchive(final Path storageRoot, final String airbyteVersion) throws IOException {
try {
final Map<DatabaseSchema, Stream<JsonNode>> data = new HashMap<>();
for (DatabaseSchema tableType : DatabaseSchema.values()) {
Expand All @@ -331,8 +356,7 @@ public void importDatabaseFromArchive(final Path storageRoot, final String airby
postgresPersistence.importDatabase(airbyteVersion, data);
LOGGER.info("Successful upgrade of airbyte postgres database from archive");
} catch (Exception e) {
LOGGER.warn("Postgres database version upgrade failed, setting DB version back to initial version");
postgresPersistence.setVersion(initialVersion);
LOGGER.warn("Postgres database version upgrade failed, reverting to state previous to migration.");
throw e;
}
}
Expand Down
25 changes: 12 additions & 13 deletions airbyte-server/src/main/java/io/airbyte/server/RunMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,40 @@ public class RunMigration implements Runnable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(RunMigration.class);
private final String targetVersion;
private final ConfigDumpExport configDumpExport;
private final ConfigDumpImport configDumpImport;
private final Path seedPath;
private final ConfigDumpExporter configDumpExporter;
private final ConfigDumpImporter configDumpImporter;
private final List<File> filesToBeCleanedUp = new ArrayList<>();

public RunMigration(String initialVersion,
JobPersistence jobPersistence,
public RunMigration(JobPersistence jobPersistence,
ConfigRepository configRepository,
String targetVersion,
Path latestSeeds) {
Path seedPath) {
this.targetVersion = targetVersion;
this.configDumpExport = new ConfigDumpExport(configRepository, jobPersistence, initialVersion);
this.configDumpImport = new ConfigDumpImport(initialVersion, targetVersion, latestSeeds, jobPersistence, configRepository);
this.seedPath = seedPath;
this.configDumpExporter = new ConfigDumpExporter(configRepository, jobPersistence);
this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence);
}

@Override
public void run() {
try {
// Export data
File exportData = configDumpExport.dump();
File exportData = configDumpExporter.dump();
filesToBeCleanedUp.add(exportData);

// Define output target
final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "airbyte_archive_output");
final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz")
.toFile();
final File output = Files.createTempFile(tempFolder, "airbyte_archive_output", ".tar.gz").toFile();
filesToBeCleanedUp.add(output);
filesToBeCleanedUp.add(tempFolder.toFile());

// Run Migration
MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(),
targetVersion);
MigrateConfig migrateConfig = new MigrateConfig(exportData.toPath(), output.toPath(), targetVersion);
MigrationRunner.run(migrateConfig);

// Import data
ImportRead importRead = configDumpImport.importData(output);
ImportRead importRead = configDumpImporter.importDataWithSeed(targetVersion, output, seedPath);
if (importRead.getStatus() == StatusEnum.FAILED) {
throw new RuntimeException("Automatic migration failed : " + importRead.getReason());
}
Expand Down
7 changes: 5 additions & 2 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ private static void runAutomaticMigration(ConfigRepository configRepository,
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
final Path latestSeedsPath = Path.of(System.getProperty("user.dir")).resolve("latest_seeds");
LOGGER.info("Last seeds dir: {}", latestSeedsPath);
try (RunMigration runMigration = new RunMigration(airbyteDatabaseVersion,
jobPersistence, configRepository, airbyteVersion, latestSeedsPath)) {
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
airbyteVersion,
latestSeedsPath)) {
runMigration.run();
} catch (Exception e) {
LOGGER.error("Automatic Migration failed ", e);
Expand Down
Loading

0 comments on commit 7fbf15c

Please sign in to comment.