Skip to content

Commit

Permalink
Add RetryStates table migration. (#7452)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Jun 23, 2023
1 parent 11304e1 commit a1c4493
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BootloaderTest {
// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.4.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.44.5.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
private static final String CDK_VERSION = "1.2.3";

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.constraint;
import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;

import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Adds the table to store retry state to be accessed by the connection manager workflow.
*/
public class V0_50_4_001__CreateRetryStatesTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_50_4_001__CreateRetryStatesTable.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
LOGGER.info("Creating table");
createRetryStatesTable(ctx);

LOGGER.info("Creating indices");
createIndices(ctx);

LOGGER.info("Completed migration: {}", this.getClass().getSimpleName());
}

private static void createRetryStatesTable(final DSLContext ctx) {
// metadata
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.notNull());
final Field<UUID> connectionId = DSL.field("connection_id", SQLDataType.UUID.notNull());
final Field<Long> jobId = DSL.field("job_id", SQLDataType.BIGINT.notNull());

// row timestamps
final Field<OffsetDateTime> createdAt = DSL
.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.notNull().defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt = DSL
.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.notNull().defaultValue(currentOffsetDateTime()));

// values
final Field<Integer> successiveCompleteFailures = DSL.field("successive_complete_failures", SQLDataType.INTEGER.notNull());
final Field<Integer> totalCompleteFailures = DSL.field("total_complete_failures", SQLDataType.INTEGER.notNull());
final Field<Integer> successivePartialFailures = DSL.field("successive_partial_failures", SQLDataType.INTEGER.notNull());
final Field<Integer> totalPartialFailures = DSL.field("total_partial_failures", SQLDataType.INTEGER.notNull());

ctx.createTableIfNotExists("retry_states")
.columns(id, connectionId, jobId, createdAt, updatedAt, successiveCompleteFailures, totalCompleteFailures, successivePartialFailures,
totalPartialFailures)
.constraints(
primaryKey(id),
foreignKey(jobId).references("jobs", "id").onDeleteCascade(),
constraint("uniq_job_id").unique("job_id"))
.execute();
}

private static void createIndices(final DSLContext ctx) {
ctx.createIndexIfNotExists("retry_state_connection_id_idx").on("retry_states", "connection_id").execute();
ctx.createIndexIfNotExists("retry_state_job_id_idx").on("retry_states", "job_id").execute();
}

}
21 changes: 21 additions & 0 deletions airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ create table "public"."normalization_summaries" (
constraint "normalization_summaries_pkey"
primary key ("id")
);
create table "public"."retry_states" (
"id" uuid not null,
"connection_id" uuid not null,
"job_id" bigint not null,
"created_at" timestamp(6) with time zone not null default current_timestamp,
"updated_at" timestamp(6) with time zone not null default current_timestamp,
"successive_complete_failures" int not null,
"total_complete_failures" int not null,
"successive_partial_failures" int not null,
"total_partial_failures" int not null,
constraint "retry_states_pkey"
primary key ("id"),
constraint "uniq_job_id"
unique ("job_id")
);
create table "public"."stream_stats" (
"id" uuid not null,
"attempt_id" bigint not null,
Expand Down Expand Up @@ -121,6 +136,10 @@ alter table "public"."normalization_summaries"
add constraint "normalization_summaries_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."retry_states"
add constraint "retry_states_job_id_fkey"
foreign key ("job_id")
references "public"."jobs" ("id");
alter table "public"."stream_stats"
add constraint "stream_stats_attempt_id_fkey"
foreign key ("attempt_id")
Expand All @@ -140,6 +159,8 @@ create index "jobs_config_type_idx" on "public"."jobs"("config_type" asc);
create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
create index "jobs_status_idx" on "public"."jobs"("status" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "retry_state_connection_id_idx" on "public"."retry_states"("connection_id" asc);
create index "retry_state_job_id_idx" on "public"."retry_states"("job_id" asc);
create index "index" on "public"."stream_stats"("attempt_id" asc);
create index "stream_status_connection_id_idx" on "public"."stream_statuses"("connection_id" asc);
create index "stream_status_job_id_idx" on "public"."stream_statuses"("job_id" asc);
Expand Down

0 comments on commit a1c4493

Please sign in to comment.