Skip to content

Commit

Permalink
Prevent sync & reset queueing from the UI (airbytehq#1434)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Jan 5, 2021
1 parent 0856be5 commit b2f5f81
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void scheduleSyncJobs() throws IOException {
final List<StandardSync> activeConnections = getAllActiveConnections();

for (StandardSync connection : activeConnections) {
final Optional<Job> previousJobOptional = jobPersistence.getLastSyncJob(connection.getConnectionId());
final Optional<Job> previousJobOptional = jobPersistence.getLastSyncScope(connection.getConnectionId());
final StandardSyncSchedule standardSyncSchedule = getStandardSyncSchedule(connection);

if (scheduleJobPredicate.test(previousJobOptional, standardSyncSchedule)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobPersistence;
import java.io.IOException;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,26 +74,37 @@ public Job createGetSpecJob(String dockerImage) throws IOException {
}

@Override
public Job createSyncJob(
SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
public Job createOrGetActiveSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
throws IOException {
final long jobId = jobCreator.createSyncJob(
final Optional<Long> jobIdOptional = jobCreator.createSyncJob(
source,
destination,
standardSync,
sourceDockerImage,
destinationDockerImage);

long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastSyncScope(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
: jobIdOptional.get();

return waitUntilJobIsTerminalOrTimeout(jobId);
}

@Override
public Job createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage)
public Job createOrGetActiveResetConnectionJob(DestinationConnection destination,
StandardSync standardSync,
String destinationDockerImage)
throws IOException {
final long jobId = jobCreator.createResetConnectionJob(destination, standardSync, destinationDockerImage);
final Optional<Long> jobIdOptional = jobCreator.createResetConnectionJob(destination, standardSync, destinationDockerImage);

long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastSyncScope(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
: jobIdOptional.get();

return waitUntilJobIsTerminalOrTimeout(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ public interface SchedulerJobClient {

Job createGetSpecJob(String dockerImage) throws IOException;

Job createSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
throws IOException;

Job createResetConnectionJob(
Job createOrGetActiveSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
throws IOException;

Job createOrGetActiveResetConnectionJob(DestinationConnection destination,
StandardSync standardSync,
String destinationDockerImage)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ public DefaultSyncJobFactory(final DefaultJobCreator jobCreator,
public Long create(final UUID connectionId) {
try {
final StandardSync standardSync = configRepository.getStandardSync(connectionId);
final SourceConnection sourceConnection =
configRepository.getSourceConnection(standardSync.getSourceId());
final DestinationConnection destinationConnection =
configRepository.getDestinationConnection(standardSync.getDestinationId());
final SourceConnection sourceConnection = configRepository.getSourceConnection(standardSync.getSourceId());
final DestinationConnection destinationConnection = configRepository.getDestinationConnection(standardSync.getDestinationId());

final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
final StandardDestinationDefinition destinationDefinition =
Expand All @@ -70,7 +68,8 @@ public Long create(final UUID connectionId) {
destinationConnection,
standardSync,
sourceImageName,
destinationImageName);
destinationImageName)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (IOException | JsonValidationException | ConfigNotFoundException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.scheduler.ScopeHelper;
import java.io.IOException;
import java.util.Optional;

public class DefaultJobCreator implements JobCreator {

Expand All @@ -63,7 +64,7 @@ public long createSourceCheckConnectionJob(SourceConnection source, String docke
.withConfigType(JobConfig.ConfigType.CHECK_CONNECTION_SOURCE)
.withCheckConnection(jobCheckConnectionConfig);

return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueJob(scope, jobConfig);
}

@Override
Expand All @@ -81,7 +82,7 @@ public long createDestinationCheckConnectionJob(DestinationConnection destinatio
.withConfigType(JobConfig.ConfigType.CHECK_CONNECTION_DESTINATION)
.withCheckConnection(jobCheckConnectionConfig);

return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueJob(scope, jobConfig);
}

@Override
Expand All @@ -98,7 +99,7 @@ public long createDiscoverSchemaJob(SourceConnection source, String dockerImageN
.withConfigType(JobConfig.ConfigType.DISCOVER_SCHEMA)
.withDiscoverCatalog(jobDiscoverCatalogConfig);

return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueJob(scope, jobConfig);
}

@Override
Expand All @@ -111,15 +112,15 @@ public long createGetSpecJob(String integrationImage) throws IOException {
.withConfigType(JobConfig.ConfigType.GET_SPEC)
.withGetSpec(new JobGetSpecConfig().withDockerImage(integrationImage));

return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueJob(scope, jobConfig);
}

@Override
public long createSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImageName,
String destinationDockerImageName)
public Optional<Long> createSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImageName,
String destinationDockerImageName)
throws IOException {
final String scope = ScopeHelper.createScope(JobConfig.ConfigType.SYNC, standardSync.getConnectionId().toString());

Expand All @@ -137,7 +138,7 @@ public long createSyncJob(SourceConnection source,
final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
.withSync(jobSyncConfig);
return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueSingletonJob(scope, jobConfig);
}

// Strategy:
Expand All @@ -148,9 +149,9 @@ public long createSyncJob(SourceConnection source,
// 4. The Empty source emits no state message, so state will start at null (i.e. start from the
// beginning on the next sync).
@Override
public long createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage)
public Optional<Long> createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage)
throws IOException {
final String scope = ScopeHelper.createScope(JobConfig.ConfigType.SYNC, standardSync.getConnectionId().toString());
final String scope = ScopeHelper.createScope(ConfigType.SYNC, standardSync.getConnectionId().toString());

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = AirbyteProtocolConverters.toConfiguredCatalog(standardSync.getSchema());
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH));
Expand All @@ -163,7 +164,7 @@ public long createResetConnectionJob(DestinationConnection destination, Standard
final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.createJob(scope, jobConfig);
return jobPersistence.enqueueSingletonJob(scope, jobConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,42 @@ public DefaultJobPersistence(Database database) {
this(database, Instant::now);
}

// configJson is a oneOf checkConnection, discoverSchema, sync
@Override
public long createJob(String scope, JobConfig jobConfig) throws IOException {
public long enqueueJob(String scope, JobConfig jobConfig) throws IOException {
LOGGER.info("enqueuing pending job for scope: " + scope);
return internalCreateJob(scope, jobConfig, true).orElseThrow(() -> new RuntimeException("This should not happen"));
}

@Override
public Optional<Long> enqueueSingletonJob(String scope, JobConfig jobConfig) throws IOException {
LOGGER.info("attempt creating pending job for scope: " + scope);
return internalCreateJob(scope, jobConfig, false);
}

private Optional<Long> internalCreateJob(String scope, JobConfig jobConfig, boolean allowQueueing) throws IOException {
LOGGER.info("creating pending job for scope: " + scope);
final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);

final Record record = database.query(
String queueingRequest = !allowQueueing
? String.format("WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE scope = '%s' and status NOT IN (%s)) ",
scope,
String.join(", ", getSQLTerminalStates()))
: "";

return database.query(
ctx -> ctx.fetch(
"INSERT INTO jobs(scope, created_at, updated_at, status, config) VALUES(?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB)) RETURNING id",
"INSERT INTO jobs(scope, created_at, updated_at, status, config) " +
"SELECT ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " +
queueingRequest +
"RETURNING id ",
scope,
now,
now,
JobStatus.PENDING.toString().toLowerCase(),
Jsons.serialize(jobConfig)))
.stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("This should not happen"));
return record.getValue("id", Long.class);
.map(r -> r.getValue("id", Long.class));
}

@Override
Expand Down Expand Up @@ -272,7 +290,7 @@ public List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus s
}

@Override
public Optional<Job> getLastSyncJob(UUID connectionId) throws IOException {
public Optional<Job> getLastSyncScope(UUID connectionId) throws IOException {
return database.query(ctx -> ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE scope = ? AND CAST(jobs.status AS VARCHAR) <> ? ORDER BY jobs.created_at DESC LIMIT 1",
ScopeHelper.createScope(ConfigType.SYNC, connectionId.toString()),
Expand Down Expand Up @@ -360,4 +378,12 @@ private static long getEpoch(Record record, String fieldName) {
return record.get(fieldName, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC);
}

private static List<String> getSQLTerminalStates() {
return JobStatus.TERMINAL_STATUSES.stream()
.map(JobStatus::toString)
.map(String::toLowerCase)
.map(s -> String.format("'%s'", s))
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import java.io.IOException;
import java.util.Optional;

public interface JobCreator {

Expand All @@ -39,13 +40,32 @@ public interface JobCreator {

long createGetSpecJob(String integrationImage) throws IOException;

long createSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
/**
*
* @param source db model representing where data comes from
* @param destination db model representing where data goes
* @param standardSync sync options
* @param sourceDockerImage docker image to use for the source
* @param destinationDockerImage docker image to use for the destination
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Optional<Long> createSyncJob(SourceConnection source,
DestinationConnection destination,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage)
throws IOException;

long createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage) throws IOException;
/**
*
* @param destination db model representing where data goes
* @param standardSync sync options
* @param destinationDockerImage docker image to use for the destination
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Optional<Long> createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,24 @@ public interface JobPersistence {
//

/**
* Creates a new job. Its initial status will be pending.
* Enqueue a new job. Its initial status will be pending.
*
* @param scope key that will be used to determine if two jobs should not be run at the same time.
* @param jobConfig configuration for the job
* @return job id
* @throws IOException exception due to interaction with persistence
*/
long createJob(String scope, JobConfig jobConfig) throws IOException;
long enqueueJob(String scope, JobConfig jobConfig) throws IOException;

/**
* Attempt to enqueue a new active job. If there is already one present, we don't create it.
*
* @param scope key that will be used to determine if two jobs should not be run at the same time.
* @param jobConfig configuration for the job
* @return job id
* @throws IOException exception due to interaction with persistence
*/
Optional<Long> enqueueSingletonJob(String scope, JobConfig jobConfig) throws IOException;

/**
* Set job status from current status to PENDING. Throws {@link IllegalStateException} if the job is
Expand Down Expand Up @@ -128,7 +138,7 @@ public interface JobPersistence {

List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException;

Optional<Job> getLastSyncJob(UUID connectionId) throws IOException;
Optional<Job> getLastSyncScope(UUID connectionId) throws IOException;

/**
* if a job does not succeed, we assume that it synced nothing. that is the most conservative
Expand Down
Loading

0 comments on commit b2f5f81

Please sign in to comment.