周期性執行緒池與主要源碼解析
- 2019 年 10 月 3 日
- 筆記
之前學習ThreadPool的使用以及源碼剖析,並且從面試的角度去介紹知識點的解答。今天小強帶來周期性執行緒池的使用和重點源碼剖析。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor:用來處理延時任務或定時任務
定時執行緒池類的類結構圖
ScheduledThreadPoolExecutor接收ScheduleFutureTask類型的任務,是執行緒池調度任務的最小單位。
它採用DelayQueue存儲等待的任務:
1、DelayQueue內部封裝成一個PriorityQueue,它會根據time的先後時間順序,如果time相同則根絕sequenceNumber排序;
2、DelayQueue是無界隊列;
ScheduleFutureTask
接收的參數:
private final long sequenceNumber;//任務的序號 private long time;//任務開始的時間 private final long period;//任務執行的時間間隔
工作執行緒的的執行過程:
工作執行緒會從DelayQueue取出已經到期的任務去執行;
執行結束後重新設置任務的到期時間,再次放回DelayQueue;
ScheduledThreadPoolExecutor會把待執行的任務放到工作隊列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的ScheduledFutureTask進行排序,具體的排序演算法實現如下:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; //首先按照time排序,time小的排到前面,time大的排到後面 long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; //time相同,按照sequenceNumber排序; //sequenceNumber小的排在前面,sequenceNumber大的排在後面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
接下來看看ScheduledFutureTask的run方法實現, run方法是調度task的核心,task的執行實際上是run方法的執行。
public void run() { //是否是周期性的 boolean periodic = isPeriodic(); //執行緒池是shundown狀態不支援處理新任務,直接取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); //如果不需要執行執行周期性任務,直接執行run方法結束 else if (!periodic) ScheduledFutureTask.super.run(); //如果需要周期性執行,則在執行任務完成後,設置下一次執行時間 else if (ScheduledFutureTask.super.runAndReset()) { //設置下一次執行該任務的時間 setNextRunTime(); //重複執行該任務 reExecutePeriodic(outerTask); } }
run方法的執行步驟:
- 1、如果執行緒池是shundown狀態不支援處理新任務,直接取消任務,否則步驟2;
- 2、如果不是周期性任務,直接調用ScheduledFutureTask的run方法執行,會設置執行結果,然後直接返回,否則步驟3;
- 3、如果是周期性任務,調用ScheduledFutureTask的runAndset方法執行,不會設置執行結果,然後直接返回,否則執行步驟4和步驟5;
- 4、計算下一次執行該任務的時間;
- 5、重複執行該任務;
接下來看下reExecutePeriodic方法的執行步驟:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
由於已經執行過一次周期性任務,所以不會reject當前任務,同時傳入的任務一定是周期性任務。
周期性執行緒池任務的提交方式
周期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和源碼兩個方面進行說明,首先是如果提交任務:
pool.schedule(new Runnable() { @Override public void run() { System.out.println("延遲執行"); } },1, TimeUnit.SECONDS); /** * 這個執行周期是固定,不管任務執行多長時間,每過3秒中就會產生一個新的任務 */ pool.scheduleAtFixedRate(new Runnable() { @Override public void run() { //這個業務邏輯需要很長的時間,超過了3秒 System.out.println("重複執行"); } },1,3,TimeUnit.SECONDS); pool.shutdown(); /** * 假如run方法30min後執行完成,然後間隔3秒,再周期性執行下一個任務 */ pool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //30min System.out.println("重複執行"); } },1,3,TimeUnit.SECONDS);
知道了如何提交周期性任務,接下來源碼是如何執行的,首先是schedule方法,該方法是指任務在指定延遲時間到達後觸發,只會執行一次。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //把任務封裝成ScheduledFutureTask,之後調用decorateTask進行包裝; //decorateTask方法是空方法,留給用戶去實現的; RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //包裝好任務之後,進行任務的提交 delayedExecute(t); return t; }
任務提交方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { //如果執行緒池不是RUNNING狀態,則使用拒絕策略把提交任務拒絕掉 if (isShutdown()) reject(task); else { //與ThreadPoolExecutor不同,這裡直接把任務加入延遲隊列 super.getQueue().add(task); //如果當前狀態無法執行任務,則取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker; //增加Worker,確保提交的任務能夠被執行 ensurePrestart(); } }
還沒關注我的公眾號?
- 掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
- 學習資料: 1T影片教程:涵蓋Javaweb前後端教學影片、機器學習/人工智慧教學影片、Linux系統教程影片、雅思考試影片教程;
- 100多本書:包含C/C++、Java、Python三門程式語言的經典必看圖書、LeetCode題解大全;
- 軟體工具:幾乎包括你在編程道路上的可能會用到的大部分軟體;
- 項目源碼:20個JavaWeb項目源碼。