Skip to content

Commit

Permalink
Introduce a new field in Job to properly store the type of job (airby…
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Jan 5, 2021
1 parent 0f7b1ee commit d24af87
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 14 deletions.
29 changes: 29 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/enums/Enums.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
package io.airbyte.commons.enums;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class Enums {
Expand All @@ -50,4 +53,30 @@ public static <T1 extends Enum<T1>, T2 extends Enum<T2>> boolean isCompatible(Cl
.isEmpty();
}

private static final Map<Class<?>, Map<String, ?>> NORMALIZED_ENUMS = Maps.newConcurrentMap();

@SuppressWarnings("unchecked")
public static <T extends Enum<T>> Optional<T> toEnum(final String value, Class<T> enumClass) {
Preconditions.checkArgument(enumClass.isEnum());

if (!NORMALIZED_ENUMS.containsKey(enumClass)) {
T[] values = enumClass.getEnumConstants();
final Map<String, T> mappings = Maps.newHashMapWithExpectedSize(values.length);
for (T t : values) {
mappings.put(normalizeName(t.name()), t);
}
NORMALIZED_ENUMS.put(enumClass, mappings);
}

return Optional.ofNullable((T) NORMALIZED_ENUMS.get(enumClass).get(normalizeName(value)));
}

public static <T extends Enum<T>> String toSqlName(final T value) {
return value.name().toLowerCase();
}

private static String normalizeName(final String name) {
return name.toLowerCase().replaceAll("[^a-zA-Z0-9]", "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

import static io.airbyte.commons.enums.Enums.convertTo;
import static io.airbyte.commons.enums.Enums.isCompatible;
import static io.airbyte.commons.enums.Enums.toEnum;
import static io.airbyte.commons.enums.Enums.toSqlName;

import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -86,4 +89,33 @@ void testNotCompatibleDifferentLength2() {
Assertions.assertFalse(isCompatible(E4.class, E1.class));
}

enum E5 {
VALUE_1,
VALUE_TWO,
value_three,
value_4
}

@Test
void testToEnum() {
Assertions.assertEquals(Optional.of(E1.TEST), toEnum("test", E1.class));
Assertions.assertEquals(Optional.of(E5.VALUE_1), toEnum("VALUE_1", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_1), toEnum("value_1", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_TWO), toEnum("VALUE_TWO", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_TWO), toEnum("valuetwo", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_TWO), toEnum("valueTWO", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_TWO), toEnum("valueTWO$", E5.class));
Assertions.assertEquals(Optional.of(E5.VALUE_TWO), toEnum("___valueTWO___", E5.class));
Assertions.assertEquals(Optional.of(E5.value_three), toEnum("VALUE_THREE", E5.class));
Assertions.assertEquals(Optional.of(E5.value_4), toEnum("VALUE_4", E5.class));
Assertions.assertEquals(Optional.empty(), toEnum("VALUE_5", E5.class));
}

@Test
void testToSqlName() {
Assertions.assertEquals("value_1", toSqlName(E5.VALUE_1));
Assertions.assertEquals("value_two", toSqlName(E5.VALUE_TWO));
Assertions.assertEquals("value_three", toSqlName(E5.value_three));
}

}
11 changes: 11 additions & 0 deletions airbyte-db/src/main/resources/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ CREATE
'succeeded'
);

CREATE
TYPE JOB_CONFIG_TYPE AS ENUM(
'check_connection_source',
'check_connection_destination',
'discover_schema',
'get_spec',
'sync',
'reset_connection'
);

-- tables
CREATE
TABLE
Expand All @@ -38,6 +48,7 @@ CREATE
TABLE
JOBS(
id BIGSERIAL PRIMARY KEY,
config_type JOB_CONFIG_TYPE,
SCOPE VARCHAR(255),
config JSONB,
status JOB_STATUS,
Expand Down
7 changes: 7 additions & 0 deletions airbyte-scheduler/src/main/java/io/airbyte/scheduler/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class Job {

private final long id;
private JobConfig.ConfigType configType;
private final String scope;
private final JobConfig config;
private final JobStatus status;
Expand All @@ -45,6 +46,7 @@ public class Job {
private final List<Attempt> attempts;

public Job(final long id,
final JobConfig.ConfigType configType,
final String scope,
final JobConfig config,
final List<Attempt> attempts,
Expand All @@ -53,6 +55,7 @@ public Job(final long id,
final long createdAtInSecond,
final long updatedAtInSecond) {
this.id = id;
this.configType = configType;
this.scope = scope;
this.config = config;
this.attempts = attempts;
Expand All @@ -66,6 +69,10 @@ public long getId() {
return id;
}

public JobConfig.ConfigType getConfigType() {
return configType;
}

public String getScope() {
return scope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.scheduler.persistence;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class DefaultJobPersistence implements JobPersistence {
static final String BASE_JOB_SELECT_AND_JOIN =
"SELECT\n"
+ "jobs.id AS job_id,\n"
+ "jobs.config_type AS config_type,\n"
+ "jobs.scope AS scope,\n"
+ "jobs.config AS config,\n"
+ "jobs.status AS job_status,\n"
Expand Down Expand Up @@ -116,14 +118,15 @@ private Optional<Long> internalCreateJob(String scope, JobConfig jobConfig, bool

return database.query(
ctx -> ctx.fetch(
"INSERT INTO jobs(scope, created_at, updated_at, status, config) " +
"SELECT ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " +
"INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " +
"SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " +
queueingRequest +
"RETURNING id ",
Enums.toSqlName(jobConfig.getConfigType()),
scope,
now,
now,
JobStatus.PENDING.toString().toLowerCase(),
Enums.toSqlName(JobStatus.PENDING),
Jsons.serialize(jobConfig)))
.stream()
.findFirst()
Expand Down Expand Up @@ -174,7 +177,7 @@ private void updateJobStatusIfNotInTerminalState(DSLContext ctx, long jobId, Job
private void updateJobStatus(DSLContext ctx, long jobId, JobStatus newStatus, LocalDateTime now) {
ctx.execute(
"UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?",
newStatus.toString().toLowerCase(),
Enums.toSqlName(newStatus),
now,
jobId);
}
Expand All @@ -201,7 +204,7 @@ public int createAttempt(long jobId, Path logPath) throws IOException {
jobId,
job.getAttemptsCount(),
logPath.toString(),
AttemptStatus.RUNNING.toString().toLowerCase(),
Enums.toSqlName(AttemptStatus.RUNNING),
now,
now)
.stream()
Expand All @@ -221,7 +224,7 @@ public void failAttempt(long jobId, int attemptNumber) throws IOException {

ctx.execute(
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?",
AttemptStatus.FAILED.toString().toLowerCase(),
Enums.toSqlName(AttemptStatus.FAILED),
now,
jobId,
attemptNumber);
Expand All @@ -238,7 +241,7 @@ public void succeedAttempt(long jobId, int attemptNumber) throws IOException {

ctx.execute(
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?",
AttemptStatus.SUCCEEDED.toString().toLowerCase(),
Enums.toSqlName(AttemptStatus.SUCCEEDED),
now,
jobId,
attemptNumber);
Expand Down Expand Up @@ -286,15 +289,15 @@ public List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus s
return database.query(ctx -> getJobsFromResult(ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.scope LIKE " + likeStatement
+ " AND CAST(jobs.status AS VARCHAR) = ? ORDER BY jobs.created_at DESC",
status.toString().toLowerCase())));
Enums.toSqlName(status))));
}

@Override
public Optional<Job> getLastSyncScope(UUID connectionId) throws IOException {
return database.query(ctx -> ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE scope = ? AND CAST(jobs.status AS VARCHAR) <> ? ORDER BY jobs.created_at DESC LIMIT 1",
ScopeHelper.createScope(ConfigType.SYNC, connectionId.toString()),
JobStatus.CANCELLED.toString().toLowerCase())
Enums.toSqlName(JobStatus.CANCELLED))
.stream()
.findFirst()
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class))));
Expand All @@ -305,7 +308,7 @@ public Optional<State> getCurrentState(UUID connectionId) throws IOException {
return database.query(ctx -> ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE scope = ? AND CAST(jobs.status AS VARCHAR) = ? ORDER BY attempts.created_at DESC LIMIT 1",
ScopeHelper.createScope(ConfigType.SYNC, connectionId.toString()),
JobStatus.SUCCEEDED.toString().toLowerCase())
Enums.toSqlName(JobStatus.SUCCEEDED))
.stream()
.findFirst()
.flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class)))
Expand Down Expand Up @@ -345,7 +348,7 @@ private static List<Job> getJobsFromResult(Result<Record> result) {
attemptRecord.get("job_id", Long.class),
Path.of(attemptRecord.get("log_path", String.class)),
output,
AttemptStatus.valueOf(attemptRecord.get("attempt_status", String.class).toUpperCase()),
Enums.toEnum(attemptRecord.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(),
getEpoch(attemptRecord, "attempt_created_at"),
getEpoch(attemptRecord, "attempt_updated_at"),
Optional.ofNullable(attemptRecord.get("attempt_ended_at"))
Expand All @@ -358,6 +361,7 @@ private static List<Job> getJobsFromResult(Result<Record> result) {
final JobConfig jobConfig = Jsons.deserialize(jobEntry.get("config", String.class), JobConfig.class);
return new Job(
jobEntry.get("job_id", Long.class),
Enums.toEnum(jobEntry.get("config_type", String.class), JobConfig.ConfigType.class).orElseThrow(),
jobEntry.get("scope", String.class),
jobConfig,
attempts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void testIsJobInTerminalState() {
}

private static Job jobWithStatus(JobStatus jobStatus) {
return new Job(1L, null, null, null, jobStatus, 0L, 0L, 0L);
return new Job(1L, null, null, null, null, jobStatus, 0L, 0L, 0L);
}

@Test
Expand All @@ -61,7 +61,7 @@ private static Job jobWithAttemptWithStatus(AttemptStatus... attemptStatuses) {
final List<Attempt> attempts = Arrays.stream(attemptStatuses)
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, 0L, 0L, null))
.collect(Collectors.toList());
return new Job(1L, null, null, attempts, null, 0L, 0L, 0L);
return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class DefaultJobPersistenceTest {
private static final Instant NOW = Instant.now();
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String SCOPE = ScopeHelper.createScope(ConfigType.SYNC, CONNECTION_ID.toString());
private static final JobConfig JOB_CONFIG = new JobConfig().withSync(new JobSyncConfig());
private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(ConfigType.SYNC)
.withSync(new JobSyncConfig());
private static final Path LOG_PATH = Path.of("/tmp/logs/all/the/way/down");

private JobPersistence jobPersistence;
Expand Down Expand Up @@ -654,6 +656,7 @@ private Job getExpectedJob(long jobId, JobStatus jobStatus, List<Attempt> attemp
private Job getExpectedJob(long jobId, JobStatus jobStatus, List<Attempt> attempts, long time) {
return new Job(
jobId,
JOB_CONFIG.getConfigType(),
SCOPE,
JOB_CONFIG,
attempts,
Expand Down

0 comments on commit d24af87

Please sign in to comment.