Skip to content

Commit

Permalink
Merge pull request Netflix#242 from tbak/master
Browse files Browse the repository at this point in the history
Add exponential backoff to heartbeat/cache refresh tasks.
  • Loading branch information
tbak committed Nov 7, 2014
2 parents a221472 + 3fcfd72 commit 91b1bc8
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,12 @@ public int getHeartbeatExecutorThreadPoolSize() {
namespace + "client.heartbeat.threadPoolSize", 2).get();
}

@Override
public int getHeartbeatExecutorExponentialBackOffBound() {
return configInstance.getIntProperty(
namespace + "client.heartbeat.exponentialBackOffBound", 10).get();
}

/**
* (non-Javadoc)
*
Expand All @@ -456,4 +462,10 @@ public int getCacheRefreshExecutorThreadPoolSize() {
return configInstance.getIntProperty(
namespace + "client.cacheRefresh.threadPoolSize", 2).get();
}

@Override
public int getCacheRefreshExecutorExponentialBackOffBound() {
return configInstance.getIntProperty(
namespace + "client.cacheRefresh.exponentialBackOffBound", 10).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,13 @@
import com.netflix.discovery.shared.LookupService;
import com.netflix.eventbus.spi.EventBus;
import com.netflix.governator.guice.lazy.FineGrainedLazySingleton;
import com.netflix.http4.MonitoredConnectionManager;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.client.apache4.ApacheHttpClient4;
import com.sun.jersey.client.apache4.config.ApacheHttpClient4Config;
import com.sun.jersey.client.apache4.config.DefaultApacheHttpClient4Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1206,22 +1202,36 @@ private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
scheduler.scheduleWithFixedDelay(
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh", cacheRefreshExecutor, registryFetchIntervalSeconds, new CacheRefreshThread()),
registryFetchIntervalSeconds,
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

if (shouldRegister(instanceInfo)) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

// Heartbeat timer
scheduler.scheduleWithFixedDelay(
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat", heartbeatExecutor, renewalIntervalInSecs, new HeartbeatThread()),
renewalIntervalInSecs,
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// InstanceInfo replication timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,28 @@ public interface EurekaClientConfig {
*/
int getHeartbeatExecutorThreadPoolSize();

/**
* Heartbeat executor exponential back off related property.
* It is a maximum multiplier value for retry delay, in case where a sequence of timeouts
* occurred.
*
* @return maximum multiplier value for retry delay
*/
int getHeartbeatExecutorExponentialBackOffBound();

/**
* The thread pool size for the cacheRefreshExecutor to initialise with
*
* @return the cacheRefreshExecutor thread pool size
*/
int getCacheRefreshExecutorThreadPoolSize();

/**
* Cache refresh executor exponential back off related property.
* It is a maximum multiplier value for retry delay, in case where a sequence of timeouts
* occurred.
*
* @return maximum multiplier value for retry delay
*/
int getCacheRefreshExecutorExponentialBackOffBound();
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package com.netflix.discovery;

import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A supervisor task that schedules subtasks while enforce a timeout.
Expand All @@ -27,14 +29,22 @@ public class TimedSupervisorTask extends TimerTask {
private final Counter rejectedCounter;
private final Counter throwableCounter;

private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final int timeoutSecs;
private final long timeoutMillis;
private final Runnable task;

public TimedSupervisorTask(String name, ThreadPoolExecutor executor, int timeoutSecs, Runnable task) {
private final AtomicLong delay;
private final long maxDelay;

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutSecs = timeoutSecs;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;

// Initialize the counters and register.
timeoutCounter = Monitors.newCounter(PREFIX + '_' + name + "_timeouts");
Expand All @@ -47,10 +57,16 @@ public void run() {
Future future = null;
try {
future = executor.submit(task);
future.get(timeoutSecs, TimeUnit.SECONDS); // block until done or timeout
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
} catch (TimeoutException e) {
logger.error("task supervisor timed out", e);
timeoutCounter.increment();

long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);

} catch (RejectedExecutionException e) {
logger.error("task supervisor rejected the task", e);
rejectedCounter.increment();
Expand All @@ -61,6 +77,7 @@ public void run() {
if (future != null) {
future.cancel(true);
}
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package com.netflix.discovery;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -10,15 +15,10 @@
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TimedSupervisorTaskTest {

private static final int EXP_BACK_OFF_BOUND = 10;

private ScheduledExecutorService scheduler;
private ListeningExecutorService helperExecutor;

Expand Down Expand Up @@ -73,8 +73,8 @@ public void tearDown() {
@Test
public void testSupervisorTaskDefaultSingleTestTaskHappyCase() throws Exception {
// testTask should never timeout
TestTask testTask = new TestTask(1);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", executor, 5, testTask);
TestTask testTask = new TestTask(1, false);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", scheduler, executor, 5, TimeUnit.SECONDS, EXP_BACK_OFF_BOUND, testTask);

helperExecutor.submit(supervisorTask).get();

Expand All @@ -89,8 +89,8 @@ public void testSupervisorTaskDefaultSingleTestTaskHappyCase() throws Exception
@Test
public void testSupervisorTaskCancelsTimedOutTask() throws Exception {
// testTask will always timeout
TestTask testTask = new TestTask(5);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", executor, 1, testTask);
TestTask testTask = new TestTask(5, false);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", scheduler, executor, 1, TimeUnit.SECONDS, EXP_BACK_OFF_BOUND, testTask);

helperExecutor.submit(supervisorTask).get();
Thread.sleep(500); // wait a little bit for the subtask interrupt handler
Expand All @@ -106,31 +106,28 @@ public void testSupervisorTaskCancelsTimedOutTask() throws Exception {
@Test
public void testSupervisorRejectNewTasksIfThreadPoolIsFullForIncompleteTasks() throws Exception {
// testTask should always timeout
TestTask testTask = new TestTask(4);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", executor, 1, testTask);

ListenableFuture a = helperExecutor.submit(supervisorTask);
ListenableFuture b = helperExecutor.submit(supervisorTask);
ListenableFuture c = helperExecutor.submit(supervisorTask);
ListenableFuture d = helperExecutor.submit(supervisorTask);
Futures.successfulAsList(a, b, c, d).get();
TestTask testTask = new TestTask(4, true);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", scheduler, executor, 1, TimeUnit.MILLISECONDS, EXP_BACK_OFF_BOUND, testTask);

scheduler.schedule(supervisorTask, 0, TimeUnit.SECONDS);

Thread.sleep(500); // wait a little bit for the subtask interrupt handlers

Assert.assertEquals(3, maxConcurrentTestTasks.get());
Assert.assertEquals(3, testTaskCounter.get());

Assert.assertEquals(3, testTaskStartCounter.get());
Assert.assertEquals(0, testTaskSuccessfulCounter.get());
Assert.assertEquals(3, testTaskInterruptedCounter.get());
Assert.assertEquals(0, testTaskInterruptedCounter.get());
}

@Test
public void testSupervisorTaskAsPeriodicScheduledJobHappyCase() throws Exception {
// testTask should never timeout
TestTask testTask = new TestTask(1);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", executor, 4, testTask);
TestTask testTask = new TestTask(1, false);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", scheduler, executor, 4, TimeUnit.SECONDS, EXP_BACK_OFF_BOUND, testTask);

scheduler.scheduleWithFixedDelay(supervisorTask, 0, 2, TimeUnit.SECONDS);
scheduler.schedule(supervisorTask, 0, TimeUnit.SECONDS);
Thread.sleep(5000); // let the scheduler run for long enough for some results

Assert.assertEquals(1, maxConcurrentTestTasks.get());
Expand All @@ -142,10 +139,10 @@ public void testSupervisorTaskAsPeriodicScheduledJobHappyCase() throws Exception
@Test
public void testSupervisorTaskAsPeriodicScheduledJobTestTaskTimingOut() throws Exception {
// testTask should always timeout
TestTask testTask = new TestTask(5);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", executor, 1, testTask);
TestTask testTask = new TestTask(5, false);
TimedSupervisorTask supervisorTask = new TimedSupervisorTask("test", scheduler, executor, 2, TimeUnit.SECONDS, EXP_BACK_OFF_BOUND, testTask);

scheduler.scheduleWithFixedDelay(supervisorTask, 0, 2, TimeUnit.SECONDS);
scheduler.schedule(supervisorTask, 0, TimeUnit.SECONDS);
Thread.sleep(5000); // let the scheduler run for long enough for some results

Assert.assertEquals(1, maxConcurrentTestTasks.get());
Expand All @@ -157,9 +154,11 @@ public void testSupervisorTaskAsPeriodicScheduledJobTestTaskTimingOut() throws E

private class TestTask implements Runnable {
private final int runTimeSecs;
private final boolean blockInterrupt;

public TestTask(int runTimeSecs) {
public TestTask(int runTimeSecs, boolean blockInterrupt) {
this.runTimeSecs = runTimeSecs;
this.blockInterrupt = blockInterrupt;
}

public void run() {
Expand All @@ -174,7 +173,16 @@ public void run() {
}
}

Thread.sleep(runTimeSecs * 1000);
long endTime = System.currentTimeMillis() + runTimeSecs * 1000;
while (endTime >= System.currentTimeMillis()) {
try {
Thread.sleep(runTimeSecs * 1000);
} catch (InterruptedException e) {
if (!blockInterrupt) {
throw e;
}
}
}

testTaskCounter.decrementAndGet();
testTaskSuccessfulCounter.incrementAndGet();
Expand Down

0 comments on commit 91b1bc8

Please sign in to comment.