Skip to content

Commit

Permalink
fix where version bump check takes place (airbytehq#8142)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Nov 19, 2021
1 parent f447183 commit d618ff9
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,34 +222,38 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

// todo (cgardens) - this method is deprecated. new migrations are not run using this code path. it
// is scheduled to be removed.
final Optional<AirbyteVersion> airbyteDatabaseVersion = runFileMigration(
airbyteVersion,
configRepository,
seed,
specFetcher,
jobPersistence,
configs);

final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();

if (!isLegalUpgrade(airbyteDatabaseVersion.orElse(null), airbyteVersion)) {
// version in the database when the server main method is called. may be empty if this is the first
// time the server is started.
final Optional<AirbyteVersion> initialAirbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (!isLegalUpgrade(initialAirbyteDatabaseVersion.orElse(null), airbyteVersion)) {
final String attentionBanner = MoreResources.readResource("banner/attention-banner.txt");
LOGGER.error(attentionBanner);
final String message = String.format(
"Cannot upgrade from version %s to version %s directly. First you must upgrade to version %s. After that upgrade is complete, you may upgrade to version %s",
airbyteDatabaseVersion.get(),
airbyteVersion,
VERSION_BREAK,
airbyteVersion);
initialAirbyteDatabaseVersion.get().serialize(),
airbyteVersion.serialize(),
VERSION_BREAK.serialize(),
airbyteVersion.serialize());

LOGGER.error(message);
throw new RuntimeException(message);
}

if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) {
// todo (cgardens) - this method is deprecated. new migrations are not run using this code path. it
// is scheduled to be removed.
// version in the database after migrations are run. cannot be null.
final AirbyteVersion airbyteDatabaseVersion = runFileMigration(
airbyteVersion,
initialAirbyteDatabaseVersion.orElse(null),
configRepository,
seed,
specFetcher,
jobPersistence,
configs);

if (AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion)) {
LOGGER.info("Starting server...");

runFlywayMigration(configs, configDatabase, jobDatabase);
Expand Down Expand Up @@ -287,7 +291,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
httpClient);
} else {
LOGGER.info("Start serving version mismatch errors. Automatic migration either failed or didn't run");
return new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion.orElseThrow(), PORT);
return new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion, PORT);
}
}

Expand Down Expand Up @@ -315,31 +319,33 @@ private static boolean isUpgradingThroughVersionBreak(final AirbyteVersion airby

@Deprecated
@SuppressWarnings({"DeprecatedIsStillUsed"})
private static Optional<AirbyteVersion> runFileMigration(final AirbyteVersion airbyteVersion,
final ConfigRepository configRepository,
final ConfigPersistence seed,
final SpecFetcher specFetcher,
final JobPersistence jobPersistence,
final Configs configs)
private static AirbyteVersion runFileMigration(final AirbyteVersion airbyteVersion,
final AirbyteVersion initialAirbyteDatabaseVersion,
final ConfigRepository configRepository,
final ConfigPersistence seed,
final SpecFetcher specFetcher,
final JobPersistence jobPersistence,
final Configs configs)
throws IOException {
// required before migration
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));

Optional<AirbyteVersion> airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
// version in the database after migration is run.
AirbyteVersion airbyteDatabaseVersion = null;
if (initialAirbyteDatabaseVersion != null && isDatabaseVersionBehindAppVersion(airbyteVersion, initialAirbyteDatabaseVersion)) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate = airbyteDatabaseVersion.get().greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
final boolean versionSupportsAutoMigrate = initialAirbyteDatabaseVersion.greaterThanOrEqualTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION);
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
runAutomaticMigration(configRepository, jobPersistence, seed, specFetcher, airbyteVersion, initialAirbyteDatabaseVersion);
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new);
airbyteDatabaseVersion = jobPersistence.getVersion().map(AirbyteVersion::new).orElseThrow();
} else {
LOGGER.info("Can not run automatic migration for Airbyte on KUBERNETES before version " + KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION.serialize());
}
}

return airbyteDatabaseVersion;
return airbyteDatabaseVersion != null ? airbyteDatabaseVersion : initialAirbyteDatabaseVersion;
}

public static void main(final String[] args) throws Exception {
Expand Down

0 comments on commit d618ff9

Please sign in to comment.