Skip to content

Commit

Permalink
Cleanup zombie jobs that occur after restart (airbytehq#1792)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Jan 23, 2021
1 parent 05072d9 commit 1132b92
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SchedulerApp(Path workspaceRoot,
this.configRepository = configRepository;
}

public void start() {
public void start() throws IOException {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf);
Expand All @@ -94,6 +94,10 @@ public void start() {
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(workerThreadPool, jobPersistence, configRepository, workerRunFactory);

// We cancel jobs that where running before the restart. They are not being monitored by the worker
// anymore.
cleanupZombies(jobPersistence);

scheduledPool.scheduleWithFixedDelay(
() -> {
jobRetrier.run();
Expand All @@ -107,6 +111,12 @@ public void start() {
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, scheduledPool));
}

private void cleanupZombies(JobPersistence jobPersistence) throws IOException {
for (Job zombieJob : jobPersistence.listJobsWithStatus(JobStatus.RUNNING)) {
jobPersistence.cancelJob(zombieJob.getId());
}
}

private static ProcessBuilderFactory getProcessBuilderFactory(Configs configs) {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
return new KubeProcessBuilderFactory(configs.getWorkspaceRoot());
Expand Down

0 comments on commit 1132b92

Please sign in to comment.