Skip to content

Commit

Permalink
Remove cancelled work from the threadpool queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Oct 6, 2016
1 parent ae725f5 commit 324a9ab
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ public Subscription schedule(final Action0 action) {

// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);

subscription.add(sa);
sa.addParent(subscription);

Future<?> f = threadPool.getExecutor().submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread));
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

return sa;
}
Expand All @@ -178,23 +179,25 @@ public Subscription schedule(final Action0 action) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}

}

/**
* Very similar to rx.internal.schedulers.ScheduledAction.FutureCompleter, but with configurable interrupt behavior
*/
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final Future<?> f;
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;

private FutureCompleterWithConfigurableInterrupt(Future<?> f, Func0<Boolean> shouldInterruptThread) {
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}

@Override
public void unsubscribe() {
executor.remove(f);
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,32 @@ public void run() {
}
}

@Test
public void testCancelledTasksInQueueGetRemoved() throws Exception {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cancellation-A");
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
SingleThreadedPoolWithQueue pool = new SingleThreadedPoolWithQueue(10, 1);
TestCommandRejection command1 = new TestCommandRejection(key, circuitBreaker, pool, 500, 600, TestCommandRejection.FALLBACK_NOT_IMPLEMENTED);
TestCommandRejection command2 = new TestCommandRejection(key, circuitBreaker, pool, 500, 600, TestCommandRejection.FALLBACK_NOT_IMPLEMENTED);

// this should go through the queue and into the thread pool
Future<Boolean> poolFiller = command1.queue();
// this command will stay in the queue until the thread pool is empty
Observable<Boolean> cmdInQueue = command2.observe();
Subscription s = cmdInQueue.subscribe();
assertEquals(1, pool.queue.size());
s.unsubscribe();
assertEquals(0, pool.queue.size());
//make sure we wait for the command to finish so the state is clean for next test
poolFiller.get();

assertCommandExecutionEvents(command1, HystrixEventType.SUCCESS);
assertCommandExecutionEvents(command2, HystrixEventType.CANCELLED);
assertEquals(0, circuitBreaker.metrics.getCurrentConcurrentExecutionCount());
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertSaneHystrixRequestLog(2);
}

@Test
public void testOnRunStartHookThrowsSemaphoreIsolated() {
final AtomicBoolean exceptionEncountered = new AtomicBoolean(false);
Expand Down

0 comments on commit 324a9ab

Please sign in to comment.