前言
ScheduledThreadPoolExecutor
可以用來(lái)很方便實(shí)現(xiàn)我們的調(diào)度任務(wù),具體使用可以參考調(diào)度線程池ScheduledThreadPoolExecutor的正確使用姿勢(shì)這篇文章,那大家知道它是怎么實(shí)現(xiàn)的嗎,本文就帶大家來(lái)揭曉謎底。
實(shí)現(xiàn)機(jī)制分析
我們先思考下,如果讓大家去實(shí)現(xiàn)ScheduledThreadPoolExecutor
可以周期性執(zhí)行任務(wù)的功能,需要考慮哪些方面呢?
ScheduledThreadPoolExecutor
的整體實(shí)現(xiàn)思路是什么呢?
答:我們是不是可以繼承線程池類,按照線程池的思路,將任務(wù)先丟到阻塞隊(duì)列中,等到時(shí)間到了,工作線程就從阻塞隊(duì)列獲取任務(wù)執(zhí)行。
- 如何實(shí)現(xiàn)等到了未來(lái)的時(shí)間點(diǎn)就開(kāi)始執(zhí)行呢?
答:我們可以根據(jù)參數(shù)獲取這個(gè)任務(wù)還要多少時(shí)間執(zhí)行,那么我們是不是可以從阻塞隊(duì)列中獲取任務(wù)的時(shí)候,通過(guò)條件隊(duì)列的的awaitNanos(delay)
方法,阻塞一定時(shí)間。
- 如何實(shí)現(xiàn) 任務(wù)的重復(fù)性執(zhí)行呢?
答:這就更加簡(jiǎn)單了,任務(wù)執(zhí)行完成后,把它再次加入到隊(duì)列不就行了嗎。
源碼解析
類結(jié)構(gòu)圖
ScheduledThreadPoolExecutor
的類結(jié)構(gòu)圖如上圖所示,很明顯它是在我們的線程池ThreadPoolExecutor
框架基礎(chǔ)上擴(kuò)展的。
ScheduledExecutorService
:實(shí)現(xiàn)了該接口,封裝了調(diào)度相關(guān)的APIThreadPoolExecutor
:繼承了該類,保留了線程池的能力和整個(gè)實(shí)現(xiàn)的框架DelayedWorkQueue
:內(nèi)部類,延遲阻塞隊(duì)列。ScheduledFutureTask
:延遲任務(wù)對(duì)象,包含了任務(wù)、任務(wù)狀態(tài)、剩余的時(shí)間、結(jié)果等信息。
重要屬性
通過(guò)ScheduledThreadPoolExecutor
類的成員屬性,我們可以了解它的數(shù)據(jù)結(jié)構(gòu)。
shutdown
后是否繼續(xù)執(zhí)行周期任務(wù)(重復(fù)執(zhí)行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
shutdown
后是否繼續(xù)執(zhí)行延遲任務(wù)(只執(zhí)行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
- 調(diào)用
cancel()
方法后,是否將該任務(wù)從隊(duì)列中移除,默認(rèn)false
private volatile boolean removeOnCancel = false;
- 任務(wù)的序列號(hào),保證FIFO隊(duì)列的順序,用來(lái)比較優(yōu)先級(jí)
private static final AtomicLong sequencer = new AtomicLong()
ScheduledFutureTask
延遲任務(wù)類
ScheduledFutureTask
繼承FutureTask
,實(shí)現(xiàn)RunnableScheduledFuture
接口,無(wú)論是runnable
還是callable
,無(wú)論是否需要延遲和定時(shí),所有的任務(wù)都會(huì)被封裝成ScheduledFutureTask
。- 該類具有延遲執(zhí)行的特點(diǎn), 覆蓋
FutureTask
的run
方法來(lái)實(shí)現(xiàn)對(duì)延時(shí)執(zhí)行、周期執(zhí)行的支持。 - 對(duì)于延時(shí)任務(wù)調(diào)用
FutureTask#run
,而對(duì)于周期性任務(wù)則調(diào)用FutureTask#runAndReset
并且在成功之后根據(jù)fixed-delay/fixed-rate
模式來(lái)設(shè)置下次執(zhí)行時(shí)間并重新將任務(wù)塞到工作隊(duì)列。 - 成員屬性如下:
// 任務(wù)序列號(hào)
private final long sequenceNumber;
// 任務(wù)可以被執(zhí)行的時(shí)間,交付時(shí)間,以納秒表示
private long time;
// 0 表示非周期任務(wù)
// 正數(shù)表示 fixed-rate(兩次開(kāi)始啟動(dòng)的間隔)模式的周期,
// 負(fù)數(shù)表示 fixed-delay(一次執(zhí)行結(jié)束到下一次開(kāi)始啟動(dòng)) 模式
private final long period;
// 執(zhí)行的任務(wù)對(duì)象
RunnableScheduledFuture
DelayedWorkQueue
延遲隊(duì)列
DelayedWorkQueue
是支持延時(shí)獲取元素的阻塞隊(duì)列, 內(nèi)部采用優(yōu)先隊(duì)列 PriorityQueue(小根堆、滿二叉樹(shù))存儲(chǔ)元素。- 內(nèi)部數(shù)據(jù)結(jié)構(gòu)是數(shù)組,所以延遲隊(duì)列出隊(duì)頭元素后需要讓其他元素(尾)替換到頭節(jié)點(diǎn),防止空指針異常。
- 成員屬性如下:
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 節(jié)點(diǎn)數(shù)量
private int size = 0;
// 存放任務(wù)的數(shù)組
private RunnableScheduledFuture?[] queue =
new RunnableScheduledFuture?[INITIAL_CAPACITY];
// 控制并發(fā)用的鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件隊(duì)列
private final Condition available = lock.newCondition();
//指定用于等待隊(duì)列頭節(jié)點(diǎn)任務(wù)的線程
private Thread leader = null;
提交延遲任務(wù)schedule()
原理
延遲執(zhí)行方法,并指定延遲執(zhí)行的時(shí)間,只會(huì)執(zhí)行一次。
schedule()
方法是延遲任務(wù)方法的入口。
public ScheduledFuture? schedule(Runnable command,
long delay,
TimeUnit unit) {
// 判空處理
if (command == null || unit == null)
throw new NullPointerException();
// 將外部傳入的任務(wù)封裝成延遲任務(wù)對(duì)象ScheduledFutureTask
RunnableScheduledFuture? t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 執(zhí)行延遲任務(wù)
delayedExecute(t);
return t;
}
decorateTask(...)
該方法是封裝延遲任務(wù)
- 調(diào)用
triggerTime(delay, unit)
方法計(jì)算延遲的時(shí)間。
// 返回【當(dāng)前時(shí)間 + 延遲時(shí)間】,就是觸發(fā)當(dāng)前任務(wù)執(zhí)行的時(shí)間
private long triggerTime(long delay, TimeUnit unit) {
// 設(shè)置觸發(fā)的時(shí)間
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 如果 delay < Long.Max_VALUE/2,則下次執(zhí)行時(shí)間為當(dāng)前時(shí)間 +delay
// 否則為了避免隊(duì)列中出現(xiàn)由于溢出導(dǎo)致的排序紊亂,需要調(diào)用overflowFree來(lái)修正一下delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 下面這種情況很少,大家看不懂可以不用強(qiáng)行理解
// 如果某個(gè)任務(wù)的 delay 為負(fù)數(shù),說(shuō)明當(dāng)前可以執(zhí)行(其實(shí)早該執(zhí)行了)。
// 阻塞隊(duì)列中維護(hù)任務(wù)順序是基于 compareTo 比較的,比較兩個(gè)任務(wù)的順序會(huì)用 time 相減。
// 那么可能出現(xiàn)一個(gè) delay 為正數(shù)減去另一個(gè)為負(fù)數(shù)的 delay,結(jié)果上溢為負(fù)數(shù),則會(huì)導(dǎo)致 compareTo 產(chǎn)生錯(cuò)誤的結(jié)果
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
// 判斷一下隊(duì)首的delay是不是負(fù)數(shù),如果是正數(shù)就不用管,怎么減都不會(huì)溢出
// 否則拿當(dāng)前 delay 減去隊(duì)首的 delay 來(lái)比較看,如果不出現(xiàn)上溢,排序不會(huì)亂
// 不然就把當(dāng)前 delay 值給調(diào)整為 Long.MAX_VALUE + 隊(duì)首 delay
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
- 調(diào)用
RunnableScheduledFuture
的構(gòu)造方法封裝為延遲任務(wù)
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// 任務(wù)的觸發(fā)時(shí)間
this.time = ns;
// 任務(wù)的周期, 延遲任務(wù)的為0,因?yàn)椴恍枰貜?fù)執(zhí)行
this.period = 0;
// 任務(wù)的序號(hào) + 1
this.sequenceNumber = sequencer.getAndIncrement();
}
- 調(diào)用
decorateTask()
方法裝飾延遲任務(wù)
// 沒(méi)有做任何操作,直接將 task 返回,該方法主要目的是用于子類擴(kuò)展
protected
提交周期任務(wù)scheduleAtFixedRate()
原理
按照固定的頻率周期性的執(zhí)行任務(wù),捕手renwu,一次任務(wù)的啟動(dòng)到下一次任務(wù)的啟動(dòng)的間隔
public ScheduledFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 任務(wù)封裝,【指定初始的延遲時(shí)間和周期時(shí)間】
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(period));
// 默認(rèn)返回本身
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開(kāi)始執(zhí)行這個(gè)任務(wù)
delayedExecute(t);
return t;
}
提交周期任務(wù)scheduleWithFixedDelay()
原理
按照指定的延時(shí)周期性執(zhí)行任務(wù),上一個(gè)任務(wù)執(zhí)行完畢后,延時(shí)一定時(shí)間,再次執(zhí)行任務(wù)。
public ScheduledFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 任務(wù)封裝,【指定初始的延遲時(shí)間和周期時(shí)間】,周期時(shí)間為 - 表示是 fixed-delay 模式
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 開(kāi)始執(zhí)行這個(gè)任務(wù)
delayedExecute(t);
return t;
}
執(zhí)行任務(wù)delayedExecute(t)
原理
上面多種提交任務(wù)的方式,殊途同歸,最終都會(huì)調(diào)用delayedExecute()
方法執(zhí)行延遲或者周期任務(wù)。
delayedExecute()
方法是執(zhí)行延遲任務(wù)的入口
private void delayedExecute(RunnableScheduledFuture? task) {
// 線程池是 SHUTDOWN 狀態(tài),執(zhí)行拒絕策略
if (isShutdown())
// 調(diào)用拒絕策略的方法
reject(task);
else {
// 把當(dāng)前任務(wù)放入阻塞隊(duì)列
super.getQueue().add(task);
// 線程池狀態(tài)為 SHUTDOWN 并且不允許執(zhí)行任務(wù)了,就從隊(duì)列刪除該任務(wù),并設(shè)置任務(wù)的狀態(tài)為取消狀態(tài)
// 非主流程,可以跳過(guò),不重點(diǎn)看了
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false);
else
// 開(kāi)始執(zhí)行了哈
ensurePrestart();
}
}
ensurePrestart()
方法開(kāi)啟線程執(zhí)行
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker數(shù)目小于corePoolSize,則添加一個(gè)worker。
if (wc < corePoolSize)
// 第二個(gè)參數(shù) true 表示采用核心線程數(shù)量限制,false 表示采用 maximumPoolSize
addWorker(null, true);
// corePoolSize = 0的情況,至少開(kāi)啟一個(gè)線程,【擔(dān)保機(jī)制】
else if (wc == 0)
addWorker(null, false);
}
addWorker()
方法實(shí)際上父類ThreadPoolExecutor
的方法,這個(gè)方法在該文章 Java線程池源碼深度解析中詳細(xì)介紹過(guò),這邊做個(gè)總結(jié):
- 如果線程池中工作線程數(shù)量小于最大線程數(shù),創(chuàng)建工作線程,執(zhí)行任務(wù)。
- 如果線程池中工作線程數(shù)量大于最大線程數(shù),直接返回。
獲取延遲任務(wù)take()原理
目前工作線程已經(jīng)創(chuàng)建好了,工作線程開(kāi)始工作了,它會(huì)從阻塞隊(duì)列中獲取延遲任務(wù)執(zhí)行,這部分也是線程池里面的原理,不做展開(kāi),那我們看下它是如何實(shí)現(xiàn)延遲執(zhí)行的? 主要關(guān)注如何從阻塞隊(duì)列中獲取任務(wù)。
DelayedWorkQueue#take()
方法獲取延遲任務(wù)
- 該方法會(huì)在上面的
addWoker()
方法創(chuàng)建工作線程后,工作線程中循環(huán)持續(xù)調(diào)用workQueue.take()
方法獲取延遲任務(wù)。 - 該方法主要獲取延遲隊(duì)列中任務(wù)延遲時(shí)間小于等于0 的任務(wù)。
- 如果延遲時(shí)間不小于0,那么調(diào)用條件隊(duì)列的
awaitNanos(delay)
阻塞方法等待一段時(shí)間,等時(shí)間到了,延遲時(shí)間自然小于等于0了。 - 獲取到任務(wù)后,工作線程就可以開(kāi)始執(zhí)行調(diào)度任務(wù)了。
// DelayedWorkQueue#take()
public RunnableScheduledFuture? take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加可中斷鎖
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 獲取阻塞隊(duì)列中的頭結(jié)點(diǎn)
RunnableScheduledFuture? first = queue[0];
// 如果阻塞隊(duì)列沒(méi)有數(shù)據(jù),為空
if (first == null)
// 等待隊(duì)列不空,直至有任務(wù)通過(guò) offer 入隊(duì)并喚醒
available.await();
else {
// 獲取頭節(jié)點(diǎn)的的任務(wù)還剩余多少時(shí)間才執(zhí)行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到達(dá)觸發(fā)時(shí)間,獲取頭節(jié)點(diǎn)并調(diào)整堆,重新選擇延遲時(shí)間最小的節(jié)點(diǎn)放入頭部
return finishPoll(first);
// 邏輯到這說(shuō)明頭節(jié)點(diǎn)的延遲時(shí)間還沒(méi)到
first = null;
// 說(shuō)明有 leader 線程在等待獲取頭節(jié)點(diǎn),當(dāng)前線程直接去阻塞等待
if (leader != null)
// 當(dāng)前線程阻塞
available.await();
else {
// 沒(méi)有 leader 線程,【當(dāng)前線程作為leader線程,并設(shè)置頭結(jié)點(diǎn)的延遲時(shí)間作為阻塞時(shí)間】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當(dāng)前線程通過(guò)awaitNanos方法等待delay時(shí)間后,會(huì)自動(dòng)喚醒,往后面繼續(xù)執(zhí)行
available.awaitNanos(delay);
// 到達(dá)阻塞時(shí)間時(shí),當(dāng)前線程會(huì)從這里醒來(lái),進(jìn)入下一輪循環(huán),就有可能執(zhí)行了
} finally {
// t堆頂更新,leader 置為 null,offer 方法釋放鎖后,
// 有其它線程通過(guò) take/poll 拿到鎖,讀到 leader == null,然后將自身更新為leader。
if (leader == thisThread)
// leader 置為 null 用以接下來(lái)判斷是否需要喚醒后繼線程
leader = null;
}
}
}
}
} finally {
// 沒(méi)有 leader 線程并且頭結(jié)點(diǎn)不為 null,喚醒阻塞獲取頭節(jié)點(diǎn)的線程,
// 【如果沒(méi)有這一步,就會(huì)出現(xiàn)有了需要執(zhí)行的任務(wù),但是沒(méi)有線程去執(zhí)行】
if (leader == null && queue[0] != null)
available.signal();
// 解鎖
lock.unlock();
}
}
finishPoll()
方法獲取到任務(wù)后執(zhí)行
該方法主要做兩個(gè)事情, 獲取頭節(jié)點(diǎn)并調(diào)整堆,重新選擇延遲時(shí)間最小的節(jié)點(diǎn)放入頭部。
private RunnableScheduledFuture?</span> finishPoll(RunnableScheduledFuture?span> f) {
// 獲取尾索引
int s = --size;
// 獲取尾節(jié)點(diǎn)
RunnableScheduledFuture? x = queue[s];
// 將堆結(jié)構(gòu)最后一個(gè)節(jié)點(diǎn)占用的 slot 設(shè)置為 null,因?yàn)樵摴?jié)點(diǎn)要嘗試升級(jí)成堆頂,會(huì)根據(jù)特性下調(diào)
queue[s] = null;
// s == 0 說(shuō)明 當(dāng)前堆結(jié)構(gòu)只有堆頂一個(gè)節(jié)點(diǎn),此時(shí)不需要做任何的事情
if (s != 0)
// 從索引處 0 開(kāi)始向下調(diào)整
siftDown(0, x);
// 出隊(duì)的元素索引設(shè)置為 -1
setIndex(f, -1);
return f;
}
延遲任務(wù)運(yùn)行的原理
從延遲隊(duì)列中獲取任務(wù)后,工作線程會(huì)調(diào)用延遲任務(wù)的run()方法執(zhí)行任務(wù)。
ScheduledFutureTask#run()
方法運(yùn)行任務(wù)
- 調(diào)用
isPeriodic()
方法判斷任務(wù)是否是周期性任務(wù)還是非周期性任務(wù) - 如果任務(wù)是非周期任務(wù),就調(diào)用父類的
FutureTask#run()
執(zhí)行一次 - 如果任務(wù)是非周期任務(wù),就調(diào)用父類的
FutureTask#runAndReset()
, 返回true會(huì)設(shè)置下一次的執(zhí)行時(shí)間,重新放入線程池的阻塞隊(duì)列中,等待下次獲取執(zhí)行
public void run() {
// 是否周期性,就是判斷 period 是否為 0
boolean periodic = isPeriodic();
// 根據(jù)是否是周期任務(wù)檢查當(dāng)前狀態(tài)能否執(zhí)行任務(wù),不能執(zhí)行就取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任務(wù),直接調(diào)用 FutureTask#run 執(zhí)行一次
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任務(wù)的執(zhí)行,返回 true 表示執(zhí)行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置周期任務(wù)的下一次執(zhí)行時(shí)間
setNextRunTime();
// 任務(wù)的下一次執(zhí)行安排,如果當(dāng)前線程池狀態(tài)可以執(zhí)行周期任務(wù),加入隊(duì)列,并開(kāi)啟新線程
reExecutePeriodic(outerTask);
}
}
FutureTask#runAndReset()
執(zhí)行周期性任務(wù)
- 周期任務(wù)正常完成后任務(wù)的狀態(tài)不會(huì)變化,依舊是 NEW,不會(huì)設(shè)置 outcome 屬性。
- 但是如果本次任務(wù)執(zhí)行出現(xiàn)異常,會(huì)進(jìn)入 setException 方法將任務(wù)狀態(tài)置為異常,把異常保存在 outcome 中。
- 方法返回 false,后續(xù)的該任務(wù)將不會(huì)再周期的執(zhí)行
protected boolean runAndReset() {
// 任務(wù)不是新建的狀態(tài)了,或者被別的線程執(zhí)行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable
ScheduledFutureTask#setNextRunTime()
設(shè)置下次執(zhí)行時(shí)間
- 如果屬性period大于0,表示
fixed-rate
模式,直接加上period時(shí)間即可。 - 如果屬性period小于等于0, 表示是
fixed-delay
模式, 調(diào)用triggerTime重新計(jì)算下次時(shí)間。
// 任務(wù)下一次的觸發(fā)時(shí)間
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【時(shí)間設(shè)置為上一次執(zhí)行任務(wù)的時(shí)間 + p】,兩次任務(wù)執(zhí)行的時(shí)間差
time += p;
else
// fixed-delay 模式,下一次執(zhí)行時(shí)間是【當(dāng)前這次任務(wù)結(jié)束的時(shí)間(就是現(xiàn)在) + delay 值】
time = triggerTime(-p);
}
ScheduledFutureTask#reExecutePeriodic()
,重新放入阻塞任務(wù)隊(duì)列,等待獲取,進(jìn)行下一輪執(zhí)行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture? task) {
if (canRunInCurrentRunState(true)) {
// 【放入任務(wù)隊(duì)列】
super.getQueue().add(task);
// 如果提交完任務(wù)之后,線程池狀態(tài)變?yōu)榱?shutdown 狀態(tài),需要再次檢查是否可以執(zhí)行,
// 如果不能執(zhí)行且任務(wù)還在隊(duì)列中未被取走,則取消任務(wù)
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 當(dāng)前線程池狀態(tài)可以執(zhí)行周期任務(wù),加入隊(duì)列,并【根據(jù)線程數(shù)量是否大于核心線程數(shù)確定是否開(kāi)啟新線程】
ensurePrestart();
}
}
-
線程池
+關(guān)注
關(guān)注
0文章
57瀏覽量
7062
發(fā)布評(píng)論請(qǐng)先 登錄
java自帶的線程池方法
原理解析:線程池中多余的線程是如何回收的?
多線程之線程池

Java線程池核心原理
如何用C++實(shí)現(xiàn)一個(gè)線程池呢?

線程池的線程怎么釋放

線程池的兩個(gè)思考

Spring 的線程池應(yīng)用

評(píng)論