From 72a5aa72c6f8cd6e330d3726b85401a10a92b01b Mon Sep 17 00:00:00 2001 From: skywalker Date: Sat, 27 May 2017 20:53:06 +0800 Subject: [PATCH] =?UTF-8?q?ScheduledThreadPoolExecutor:=20=E9=87=8D?= =?UTF-8?q?=E7=94=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _config.yml | 1 - .../scheduledthreadpool.md | 124 +++++++++++++++++- 2 files changed, 123 insertions(+), 2 deletions(-) delete mode 100644 _config.yml diff --git a/_config.yml b/_config.yml deleted file mode 100644 index ddeb671..0000000 --- a/_config.yml +++ /dev/null @@ -1 +0,0 @@ -theme: jekyll-theme-time-machine \ No newline at end of file diff --git a/note/ScheduledThreadPool/scheduledthreadpool.md b/note/ScheduledThreadPool/scheduledthreadpool.md index 9d2c887..7c8a9bb 100644 --- a/note/ScheduledThreadPool/scheduledthreadpool.md +++ b/note/ScheduledThreadPool/scheduledthreadpool.md @@ -264,7 +264,129 @@ public boolean cancel(boolean mayInterruptIfRunning) { 父类FutureTask的cancel方法已经见过了,removeOnCancel为ScheduledThreadPoolExecutor的属性,默认为false,其实这里调用remove是不必要的,因为已经被调用过了。 -# 任务等待 +### Worker启动 + +ThreadPoolExecutor.ensurePrestart: + +```java +void ensurePrestart() { + int wc = workerCountOf(ctl.get()); + if (wc < corePoolSize) + addWorker(null, true); + else if (wc == 0) + addWorker(null, false); +} +``` + +即使corePoolSize为0,也要保证有一个Worker线程。 + +# 任务获取 + +在ThreadPoolExecutor我们已经见过了,Worker线程通过调用任务队列的take方法进行获取: + +```java +public RunnableScheduledFuture take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + RunnableScheduledFuture first = queue[0]; + //堆为空 + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + //getDelay返回的是延时执行时间和当前时间的差,非正值说明此任务可以执行了 + if (delay <= 0) + return finishPoll(first); + first = null; + if (leader != null) + //已存在leader,所以当前线程为follower,永久等待 + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + //当前线程成为leader,等待至下一次任务执行时间 + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && queue[0] != null) + //当前线程接下来要去执行定时任务逻辑,所以唤醒一个follower(如果有),使之成为新的leader + available.signal(); + lock.unlock(); + } +} +``` + +这里其实应用了Leader/Follower模式,参考: + +[Leader/Follower多线程网络模型介绍 ](http://blog.csdn.net/goldlevi/article/details/7705180) + +使用这种模式的原因猜想应该是这样: 由于定时任务的特殊性,在某一时刻应该只有一个任务等开始时间最短,这样的话只让一个线程阻塞至既定时间即可,其它线程及时醒来也不能立即执行任务,从而造成了性能的浪费。 + +如果堆为空,那么等待的Worker何时被唤醒呢?玄机就在offer方法,相关源码: + +```java +if (queue[0] == e) { + leader = null; + available.signal(); +} +``` + +为什么新任务被至于堆顶时需要唤醒Worker呢,因为这就意味着之前堆为空或最近需要执行任务的时间已经改变,需要重新调整leader的睡眠时间。 + +finishPoll方法很容易猜到,就是填补堆顶的空缺: + +```java +private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { + int s = --size; + RunnableScheduledFuture x = queue[s]; + queue[s] = null; + if (s != 0) + siftDown(0, x); + setIndex(f, -1); + return f; +} +``` + +将最后 一个元素从堆顶使其"沉沦"。 + +# 重生 + +对于持续执行的任务,在一次执行完成后应该将其再次放入到堆中,以待下次执行,这一步是在ScheduledFutureTask的run方法中完成: + +```java +public void run() { + boolean periodic = isPeriodic(); + if (!canRunInCurrentRunState(periodic)) + cancel(false); + //单次任务 + else if (!periodic) + ScheduledFutureTask.super.run(); + //持续任务 + else if (ScheduledFutureTask.super.runAndReset()) { + //设置下次执行的时间 + setNextRunTime(); + //重新加入到堆中 + reExecutePeriodic(outerTask); + } +} +``` + +FutureTask.runAndReset方法便是调用任务逻辑的地方,不同于我们已经见过的run方法,这里**不会设置任务执行的结果(即outcome属性),也不会改变Future的状态**,所以即使一次执行完毕,Future看到的状态仍是未完成。 + +# shutdown + + + +