Skip to content

Commit

Permalink
WorkerTaskMonitor: stop() waits for mainLoop to exit.
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Mar 11, 2016
1 parent f4ab1c2 commit 79a95f7
Showing 1 changed file with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
Expand All @@ -57,6 +59,7 @@
public class WorkerTaskMonitor
{
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
private static final int STOP_WARNING_SECONDS = 10;

private final ObjectMapper jsonMapper;
private final PathChildrenCache pathChildrenCache;
Expand All @@ -68,6 +71,7 @@ public class WorkerTaskMonitor
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Map<String, TaskDetails> running = new ConcurrentHashMap<>();

private final CountDownLatch doneStopping = new CountDownLatch(1);
private final Object lifecycleLock = new Object();
private volatile boolean started = false;

Expand Down Expand Up @@ -149,6 +153,9 @@ private void mainLoop()
catch (InterruptedException e) {
log.info("WorkerTaskMonitor interrupted, exiting.");
}
finally {
doneStopping.countDown();
}
}

private void restoreRestorableTasks()
Expand Down Expand Up @@ -237,19 +244,27 @@ public void onFailure(Throwable t)
}

@LifecycleStop
public void stop()
public void stop() throws InterruptedException
{
synchronized (lifecycleLock) {
Preconditions.checkState(started, "not started");

try {
started = false;
exec.shutdownNow();
pathChildrenCache.close();
taskRunner.stop();

started = false;
if (!doneStopping.await(STOP_WARNING_SECONDS, TimeUnit.SECONDS)) {
log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", STOP_WARNING_SECONDS);
doneStopping.await();
}

log.info("Stopped WorkerTaskMonitor.");
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping WorkerTaskMonitor")
.emit();
Expand Down

0 comments on commit 79a95f7

Please sign in to comment.