Skip to content

Commit

Permalink
Add better messages around LookupCoordinatorManager failures (apache#…
Browse files Browse the repository at this point in the history
…3027)

* Add better messages around LookupCoordinatorManager failures
* Catches apache#3026

* A few more little tests

* Add more forceful shutdown
  • Loading branch information
drcrallen authored and b-slim committed May 26, 2016
1 parent e2653a8 commit 847501a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
Expand All @@ -31,8 +32,10 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -121,6 +125,8 @@ public URL apply(HostAndPort input)
private AtomicReference<Map<String, Map<String, Map<String, Object>>>> lookupMapConfigRef;
private volatile Map<String, Map<String, Map<String, Object>>> prior_update = ImmutableMap.of();
private volatile boolean started = false;
private volatile ListenableScheduledFuture<?> backgroundManagerFuture = null;
private final CountDownLatch backgroundManagerExitedLatch = new CountDownLatch(1);


@Inject
Expand Down Expand Up @@ -535,7 +541,7 @@ public void start()
},
null
);
executorService.scheduleWithFixedDelay(
final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture = executorService.scheduleWithFixedDelay(
new Runnable()
{
@Override
Expand Down Expand Up @@ -588,6 +594,27 @@ public void run()
lookupCoordinatorManagerConfig.getPeriod(),
TimeUnit.MILLISECONDS
);
Futures.addCallback(backgroundManagerFuture, new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object result)
{
backgroundManagerExitedLatch.countDown();
LOG.debug("Exited background lookup manager");
}

@Override
public void onFailure(Throwable t)
{
backgroundManagerExitedLatch.countDown();
if (backgroundManagerFuture.isCancelled()) {
LOG.info("Background lookup manager exited");
LOG.trace(t, "Background lookup manager exited with throwable");
} else {
LOG.makeAlert(t, "Background lookup manager exited with error!").emit();
}
}
});
started = true;
LOG.debug("Started");
}
Expand All @@ -603,6 +630,11 @@ public void stop()
}
started = false;
executorService.shutdownNow();
final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture;
this.backgroundManagerFuture = null;
if (backgroundManagerFuture != null && !backgroundManagerFuture.cancel(true)) {
LOG.warn("Background lookup manager thread could not be cancelled");
}
// NOTE: we can't un-watch the configuration key
LOG.debug("Stopped");
}
Expand All @@ -627,4 +659,17 @@ private static boolean httpStatusIsNotFound(int statusCode)
{
return statusCode == 404;
}

@VisibleForTesting
boolean backgroundManagerIsRunning()
{
ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture;
return backgroundManagerFuture != null && !backgroundManagerFuture.isDone();
}

@VisibleForTesting
boolean waitForBackgroundTermination(long timeout) throws InterruptedException
{
return backgroundManagerExitedLatch.await(timeout, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1190,11 +1190,18 @@ public void testStart() throws Exception
discoverer,
mapper,
configManager,
lookupCoordinatorManagerConfig
new LookupCoordinatorManagerConfig(){
@Override
public long getPeriod(){
return 1;
}
}
);
manager.start();
manager.start();
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertNull(manager.getKnownLookups());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
EasyMock.verify(configManager);
}

Expand All @@ -1219,8 +1226,12 @@ public void testStop() throws Exception
lookupCoordinatorManagerConfig
);
manager.start();
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
manager.stop();
Assert.assertTrue(manager.waitForBackgroundTermination(10));
Assert.assertFalse(manager.backgroundManagerIsRunning());
EasyMock.verify(configManager);
}

Expand All @@ -1245,6 +1256,8 @@ public void testStartTooMuch() throws Exception
lookupCoordinatorManagerConfig
);
manager.start();
Assert.assertTrue(manager.backgroundManagerIsRunning());
Assert.assertFalse(manager.waitForBackgroundTermination(10));
manager.stop();
expectedException.expect(new BaseMatcher<Throwable>()
{
Expand Down

0 comments on commit 847501a

Please sign in to comment.