Skip to content

Commit

Permalink
Improvements to EurekaNotificationServerListUpdater
Browse files Browse the repository at this point in the history
- add additional checks for starting condition
- bound the default executor
  • Loading branch information
qiangdavidliu committed Oct 7, 2016
1 parent 5e6066d commit 82cddcb
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netflix.niws.loadbalancer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.config.DynamicIntProperty;
import com.netflix.discovery.CacheRefreshedEvent;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEvent;
Expand All @@ -11,9 +12,10 @@

import javax.inject.Provider;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -31,29 +33,58 @@ public class EurekaNotificationServerListUpdater implements ServerListUpdater {
private static final Logger logger = LoggerFactory.getLogger(EurekaNotificationServerListUpdater.class);

private static class LazyHolder {
private static final ExecutorService DEFAULT_SERVER_LIST_UPDATE_EXECUTOR = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("EurekaNotificationServerListUpdater-%d")
.setDaemon(true)
.build()
);

private static final Thread SHUTDOWN_THREAD = new Thread(new Runnable() {
@Override
public void run() {
logger.info("Shutting down the Executor for EurekaNotificationServerListUpdater");
try {
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR.shutdown();
Runtime.getRuntime().removeShutdownHook(SHUTDOWN_THREAD);
} catch (Exception e) {
// this can happen in the middle of a real shutdown, and that's ok.
}
}
});
private final static String CORE_THREAD = "EurekaNotificationServerListUpdater.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);

private static ThreadPoolExecutor DEFAULT_SERVER_LIST_UPDATE_EXECUTOR;
private static Thread SHUTDOWN_THREAD;

static {
int corePoolSize = getCorePoolSize();
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR = new ThreadPoolExecutor(
corePoolSize,
corePoolSize * 5,
0,
TimeUnit.NANOSECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadFactoryBuilder()
.setNameFormat("EurekaNotificationServerListUpdater-%d")
.setDaemon(true)
.build()
);

poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
int corePoolSize = getCorePoolSize();
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR.setCorePoolSize(corePoolSize);
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR.setMaximumPoolSize(corePoolSize * 5);
}
});

SHUTDOWN_THREAD = new Thread(new Runnable() {
@Override
public void run() {
logger.info("Shutting down the Executor for EurekaNotificationServerListUpdater");
try {
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR.shutdown();
Runtime.getRuntime().removeShutdownHook(SHUTDOWN_THREAD);
} catch (Exception e) {
// this can happen in the middle of a real shutdown, and that's ok.
}
}
});

Runtime.getRuntime().addShutdownHook(SHUTDOWN_THREAD);
}

private static int getCorePoolSize() {
int propSize = poolSizeProp.get();
if (propSize > 0) {
return propSize;
}
return 2; // default
}
}

public static ExecutorService getDefaultRefreshExecutor() {
Expand Down Expand Up @@ -88,17 +119,21 @@ public synchronized void start(final UpdateAction updateAction) {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
}
}
}
}); // fire and forget
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
}
}
}
};
Expand All @@ -107,6 +142,9 @@ public void run() {
}
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener to eureka client, eureka client is null");
throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered, no-op");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,25 @@ public void doUpdate() {
EasyMock.verify(eurekaClientMock2);
}

@Test(expected = IllegalStateException.class)
public void testFailIfDiscoveryIsNotAvailable() {
EurekaNotificationServerListUpdater serverListUpdater = new EurekaNotificationServerListUpdater(
new Provider<EurekaClient>() {
@Override
public EurekaClient get() {
return null;
}
}
);

serverListUpdater.start(new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
Assert.fail("Should not reach here");
}
});
}

private EurekaClient setUpEurekaClientMock() {
final EurekaClient eurekaClientMock = EasyMock.createMock(EurekaClient.class);

Expand Down

0 comments on commit 82cddcb

Please sign in to comment.