Skip to content

Commit

Permalink
eager thradpool refine (apache#9608)
Browse files Browse the repository at this point in the history
* set super class of AvailableCluster to AbstractCluster

* delete useless imports

* eager threapool refine

* remove useless import
  • Loading branch information
HetaoWang authored Jan 23, 2022
1 parent 3339e88 commit 5672670
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* EagerThreadPoolExecutor
*/
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
Expand All @@ -43,43 +37,24 @@ public EagerThreadPoolExecutor(int corePoolSize,
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();

try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean offer(Runnable runnable) {

int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class EagerThreadPoolExecutorTest {
Expand Down Expand Up @@ -99,4 +100,39 @@ public void testSPI() {
Assertions.assertEquals("EagerThreadPoolExecutor", executorService.getClass()
.getSimpleName(), "test spi fail!");
}

@Test
public void testEagerThreadPool_rejectExecution() throws Exception {
String name = "eager-tf";
int cores = 1;
int threads = 3;
int queues = 2;
long alive = 1000;

// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<>(queues);
final EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive, TimeUnit.MILLISECONDS,
taskQueue,
new NamedThreadFactory(name, true),
new AbortPolicyWithReport(name, URL));
taskQueue.setExecutor(executor);

Runnable runnable = () -> {
System.out.println("thread number in current pool: " + executor.getPoolSize() + ", task number is task queue: " + executor.getQueue().size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 5; i++) {
Thread.sleep(50);
executor.execute(runnable);
}
Assertions.assertThrows(RejectedExecutionException.class, () -> executor.execute(runnable));

Thread.sleep(10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testOffer2() throws Exception {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(2);
Mockito.when(executor.getSubmittedTaskCount()).thenReturn(1);
Mockito.when(executor.getActiveCount()).thenReturn(1);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(true));
}
Expand All @@ -53,7 +53,7 @@ public void testOffer3() throws Exception {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(2);
Mockito.when(executor.getSubmittedTaskCount()).thenReturn(2);
Mockito.when(executor.getActiveCount()).thenReturn(2);
Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(false));
Expand All @@ -64,7 +64,7 @@ public void testOffer4() throws Exception {
TaskQueue<Runnable> queue = new TaskQueue<Runnable>(1);
EagerThreadPoolExecutor executor = mock(EagerThreadPoolExecutor.class);
Mockito.when(executor.getPoolSize()).thenReturn(4);
Mockito.when(executor.getSubmittedTaskCount()).thenReturn(4);
Mockito.when(executor.getActiveCount()).thenReturn(4);
Mockito.when(executor.getMaximumPoolSize()).thenReturn(4);
queue.setExecutor(executor);
assertThat(queue.offer(mock(Runnable.class)), is(true));
Expand Down

0 comments on commit 5672670

Please sign in to comment.