ScheduledThreadPoolExecutor源碼分析-你知道定時線程池是如何實現延遲執行和周期執行的嗎?

Java版本:8u261。

1 簡介

ScheduledThreadPoolExecutor即定時線程池,是用來執行延遲任務或周期性任務的。相比於Timer的單線程,定時線程池在遇到任務拋出異常的時候不會關閉整個線程池,更加健壯(需要提一下的是:ScheduledThreadPoolExecutor和ThreadPoolExecutor一樣,如果執行任務的過程中拋異常的話,這個任務是會被丟棄的。所以在任務的執行過程中需要對異常做捕獲處理,有必要的話需要做補償措施)。

傳進來的任務會被包裝為ScheduledFutureTask,其繼承於FutureTask,提供異步執行的能力,並且可以返回執行結果。同時實現了Delayed接口,可以通過getDelay方法來獲取延遲時間。

相比於ThreadPoolExecutor,ScheduledThreadPoolExecutor中使用的隊列是DelayedWorkQueue,是一個無界的隊列。所以在定時線程池中,最大線程數是沒有意義的(最大線程數會固定為int的最大值,且不會作為定時線程池的參數)。在ThreadPoolExecutor中,如果當前線程數小於核心線程數就直接創建核心線程來執行任務,大於等於核心線程數的話才往阻塞隊列中放入任務;而在ScheduledThreadPoolExecutor中卻不是這種邏輯。ScheduledThreadPoolExecutor中上來就會把任務放進延遲隊列中,然後再去等待執行。

1.1 小頂堆

DelayedWorkQueue的實現有些特殊,是基於小頂堆構建的(與DelayQueue和PriorityQueue類似)。因為要保證每次從延遲隊列中拿取到的任務是距現在最近的一個,所以使用小頂堆結構來構建是再適合不過了(堆結構也常常用來解決前N小和前N大的問題)。小頂堆保證每個節點的值不小於其父節點的值,而不大於其孩子節點的值,而對於同級節點來說則沒有什麼限制。這樣在小頂堆中值最小的點永遠保證是在根節點處。如果用數組來構建小頂堆的話,值最小的點就在數組中的第一個位置處。

img

圖中紅色的數字代表節點在數組中的索引位置,由此可以看出堆的另一條性質是:假設當前節點的索引是k,那麼其父節點的索引是:(k-1)/2;左孩子節點的索引是:k2+1;而右孩子節點的索引是k2+2。

構建堆的兩個核心方法是siftUpsiftDown,siftUp方法用於添加節點時的上溯過程;而siftDown方法用於刪除節點時的下溯過程。具體的實現源碼會在下面進行分析,這裡就畫圖來理解一下(下面只會分析經典的小頂堆添加和刪除節點的實現,而在源碼中的實現略有不同,但核心都是一樣的):

1.1.1 添加節點

img

如果在上面的siftUp過程中,發現某一次當前節點的值就已經大於了父節點的值,siftUp過程也就會提前終止了。同時可以看出:在上面的siftUp以及下面將要講的siftDown操作過程中,每次都只會比較並交換當前節點和其父子節點的值,而不是整個堆都發生變動,降低了時間複雜度。

1.1.2 刪除節點

刪除節點分為三種情況,首先來看一下刪除根節點的情況

img

然後是刪除最後一個節點的情況。刪除最後一個節點是最簡單的,只需要進行刪除就行了,因為這並不影響小頂堆的結構,不需要進行調整。這裡就不再展示了(注意:刪除除了最後一個節點的其他葉子節點並不屬於當前這種情況,而是屬於下面第三種情況。也就是說刪除這些葉子節點並不能簡單地刪除它們就完了的,因為堆結構首先得保證是一顆完全二叉樹)。

最後是刪除既不是根節點又不是最後一個節點的情況

img

在刪除既不是根節點又不是最後一個節點的時候,可以看到執行了一次siftDown並伴隨了一次siftUp的過程。但是這個siftUp過程並不是會一定觸發的,只有滿足最後一個節點的值比要刪除節點的父節點的值還要小的時候才會觸發siftUp操作(這個很好推理:在小頂堆中如果最後一個節點值比要刪除節點的父節點值要小的話,那麼要刪除節點的左右孩子節點值也必然是都大於最後一個節點值的(不考慮值相等的情況),那麼此時就不會發生siftDown操作;而如果發生了siftDown操作,就說明最後一個節點值至少要比要刪除節點的左右孩子節點中的一個要大(如果有左右孩子節點的話)。而孫子節點值是肯定要大於爺爺節點值的(不考慮值相等的情況),所以也就是說發生了siftDown操作的時候,最後一個節點值是比要刪除節點的父節點值大的。這個時候孫子節點和最後一個節點siftDown交換後,依然是滿足小頂堆性質的,所以就不需要附加的siftUp操作;還有一種情況是最後一個節點值是介於要刪除節點的父節點值和要刪除節點的左右孩子節點值中的較小者,那麼這個時候既不會發生siftDown,也不會發生siftUp)。

而源碼中的實現和上面的經典實現最大的不同就是不會有節點彼此交換的操作。在siftUp和siftDown的經典實現中,如果需要變動節點時,都會來一次父子節點的互相交換操作(包括刪除節點時首先做的要刪除節點和最後一個節點之間的交換操作也是如此)。如果仔細思考的話,就會發現這其實是多餘的。在需要交換節點的時候,只需要siftUp操作時的父節點或siftDown時的孩子節點重新移到當前需要比較的節點位置上,而比較節點是不需要移動到它們的位置上的。此時直接進入到下一次的判斷中,重複siftUp或siftDown過程,直到最後找到了比較節點的插入位置後,才會將其插入進去。這樣做的好處是可以省去一半的節點賦值的操作,提高了執行的效率。同時這也就意味着,需要將要比較的節點作為參數保存起來,而源碼中也正是這麼實現的。

1.2 Leader-Follower模式

ScheduledThreadPoolExecutor中使用了Leader-Follower模式。這是一種設計思想,假如說現在有一堆等待執行的任務(一般是存放在一個隊列中排好序),而所有的工作線程中只會有一個是leader線程,其他的線程都是follower線程。只有leader線程能執行任務,而剩下的follower線程則不會執行任務,它們會處在休眠中的狀態。當leader線程拿到任務後執行任務前,自己會變成follower線程,同時會選出一個新的leader線程,然後才去執行任務。如果此時有下一個任務,就是這個新的leader線程來執行了,並以此往複這個過程。當之前那個執行任務的線程執行完畢再回來時,會判斷如果此時已經沒任務了,又或者有任務但是有其他的線程作為leader線程,那麼自己就休眠了;如果此時有任務但是沒有leader線程,那麼自己就會重新成為leader線程來執行任務。

不像ThreadPoolExecutor是需要立即執行任務的,ScheduledThreadPoolExecutor中的任務是延遲執行的,而拿取任務也是延遲拿取的。所以並不需要所有的線程都處於運行狀態延時等待獲取任務。而如果這麼做的話,最後也只會有一個線程能執行當前任務,其他的線程還是會被再次休眠的(這裡只是在說單任務多線程的情況,但對於多任務來說也是一樣的,總結來說就是Leader-Follower模式只會喚醒真正需要「幹事」的線程)。這是很沒有必要的,而且浪費資源。所以使用Leader-Follower模式的好處是:避免沒必要的喚醒和阻塞的操作,這樣會更加有效,且節省資源。

2 構造器

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public ScheduledThreadPoolExecutor(int corePoolSize) {
 5     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 6             new DelayedWorkQueue());
 7 }
 8
 9 /**
10  * ThreadPoolExecutor:
11  */
12 public ThreadPoolExecutor(int corePoolSize,
13                           int maximumPoolSize,
14                           long keepAliveTime,
15                           TimeUnit unit,
16                           BlockingQueue<Runnable> workQueue) {
17     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
18             Executors.defaultThreadFactory(), defaultHandler);
19 }

可以看到:ScheduledThreadPoolExecutor的構造器是調用了父類ThreadPoolExecutor的構造器來實現的,而父類的構造器以及之中的所有參數我在之前分析ThreadPoolExecutor的源碼文章中講過,這裡就不再贅述了。

3 schedule方法

execute方法和submit方法內部都是調用的schedule方法,所以來看一下其實現:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public ScheduledFuture<?> schedule(Runnable command,
 5                                    long delay,
 6                                    TimeUnit unit) {
 7     //非空校驗
 8     if (command == null || unit == null)
 9         throw new NullPointerException();
10     //包裝任務
11     RunnableScheduledFuture<?> t = decorateTask(command,
12             new ScheduledFutureTask<Void>(command, null,
13                     triggerTime(delay, unit)));
14     //延遲執行
15     delayedExecute(t);
16     return t;
17 }
18
19 /**
20  * 第13行代碼處:
21  * 延遲操作的觸發時間
22  */
23 private long triggerTime(long delay, TimeUnit unit) {
24     //delay非負處理
25     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
26 }
27
28 long triggerTime(long delay) {
29     /*
30     now方法內部就一句話:「System.nanoTime();」,也就是獲取當前時間。這裡也就是獲取
31     當前時間加上延遲時間後的結果。如果延遲時間超過了上限,會在overflowFree方法中處理
32      */
33     return now() +
34             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
35 }
36
37 private long overflowFree(long delay) {
38     //獲取隊頭節點(不移除)
39     Delayed head = (Delayed) super.getQueue().peek();
40     if (head != null) {
41         //獲取隊頭的剩餘延遲時間
42         long headDelay = head.getDelay(NANOSECONDS);
43         /*
44         能走進本方法中,就說明delay是一個接近long最大值的數。此時判斷如果headDelay小於0
45         就說明延遲時間已經到了或過期了但是還沒有執行,並且delay和headDelay的差值小於0,說明headDelay
46         和delay的差值已經超過了long的範圍
47          */
48         if (headDelay < 0 && (delay - headDelay < 0))
49             //此時更新一下delay的值,確保其和headDelay的差值在long的範圍內,同時delay也會重新變成一個正數
50             delay = Long.MAX_VALUE + headDelay;
51     }
52     return delay;
53 }
54
55 /**
56  * 第39行代碼處:
57  * 調用DelayedWorkQueue中覆寫的peek方法來獲取隊頭節點
58  */
59 public RunnableScheduledFuture<?> peek() {
60     final ReentrantLock lock = this.lock;
61     lock.lock();
62     try {
63         return queue[0];
64     } finally {
65         lock.unlock();
66     }
67 }
68
69 /**
70  * 第42行代碼處:
71  * 可以看到本方法就是獲取延遲時間和當前時間的差值
72  */
73 public long getDelay(TimeUnit unit) {
74     return unit.convert(time - now(), NANOSECONDS);
75 }

4 包裝任務

上面第11行和第12行代碼處會進行任務的包裝:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 ScheduledFutureTask(Runnable r, V result, long ns) {
 5     //調用父類FutureTask的構造器
 6     super(r, result);
 7     //這裡會將延遲時間賦值給this.time
 8     this.time = ns;
 9     //period用來表示任務的類型,為0表示延遲任務,否則表示周期性任務
10     this.period = 0;
11     //這裡會給每一個任務賦值一個唯一的序列號。當延遲時間相同時,會以該序列號來進行判斷。序列號小的會出隊
12     this.sequenceNumber = sequencer.getAndIncrement();
13 }
14
15 /**
16  * schedule方法第11行代碼處:
17  * 包裝任務,這裡只是返回task而已,子類可以覆寫本方法中的邏輯
18  */
19 protected <V> RunnableScheduledFuture<V> decorateTask(
20         Runnable runnable, RunnableScheduledFuture<V> task) {
21     return task;
22 }

5 delayedExecute方法

在schedule方法的第15行代碼處會執行延遲任務,添加任務和補充工作線程:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  */
  4 private void delayedExecute(RunnableScheduledFuture<?> task) {
  5     if (isShutdown())
  6         /*
  7         這裡會調用父類ThreadPoolExecutor的isShutdown方法來判斷當前線程池是否處於關閉或正在關閉的狀態,
  8         如果是的話就執行具體的拒絕策略
  9          */
 10         reject(task);
 11     else {
 12         //否則就往延遲隊列中添加當前任務
 13         super.getQueue().add(task);
 14         /*
 15         添加後繼續判斷當前線程池是否處於關閉或正在關閉的狀態,如果是的話就判斷此時是否還能繼續執行任務,
 16         如果不能的話就刪除上面添加的任務
 17          */
 18         if (isShutdown() &&
 19                 !canRunInCurrentRunState(task.isPeriodic()) &&
 20                 remove(task))
 21             //同時會取消此任務的執行
 22             task.cancel(false);
 23         else
 24             //否則,說明線程池是可以繼續執行任務的,就去判斷此時是否需要補充工作線程
 25             ensurePrestart();
 26     }
 27 }
 28
 29 /**
 30  * 第19行代碼處:
 31  * 傳進來的periodic表示任務是否是周期性任務,如果是的話就是true(通過「period != 0」進行判斷)
 32  */
 33 boolean canRunInCurrentRunState(boolean periodic) {
 34     return isRunningOrShutdown(periodic ?
 35             //關閉線程池時判斷是否需要繼續執行周期性任務
 36             continueExistingPeriodicTasksAfterShutdown :
 37             //關閉線程池時判斷是否需要繼續執行延遲任務
 38             executeExistingDelayedTasksAfterShutdown);
 39 }
 40
 41 /**
 42  * ThreadPoolExecutor:
 43  */
 44 final boolean isRunningOrShutdown(boolean shutdownOK) {
 45     //獲取當前線程池的運行狀態
 46     int rs = runStateOf(ctl.get());
 47     //如果是RUNNING狀態的,或者是SHUTDOWN狀態並且是能繼續執行任務的,就返回true
 48     return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
 49 }
 50
 51 /**
 52  * ScheduledThreadPoolExecutor:
 53  * 上面第20行代碼處的remove方法會調用ThreadPoolExecutor的remove方法,而該方法我在之前的
 54  * ThreadPoolExecutor的源碼分析文章中已經分析過了。但是其中會調用延遲隊列覆寫的remove邏輯,
 55  * 也就是本方法(同時第130行代碼處也會調用到這裡)
 56  */
 57 public boolean remove(Object x) {
 58     final ReentrantLock lock = this.lock;
 59     //加鎖
 60     lock.lock();
 61     try {
 62         //獲取當前節點的堆索引位
 63         int i = indexOf(x);
 64         if (i < 0)
 65             //如果找不到的話,就直接返回false
 66             return false;
 67
 68         //將當前節點的索引位設置為-1,因為下面要進行刪除了
 69         setIndex(queue[i], -1);
 70         //size-1
 71         int s = --size;
 72         //獲取小頂堆的最後一個節點,用於替換
 73         RunnableScheduledFuture<?> replacement = queue[s];
 74         //將最後一個節點置為null
 75         queue[s] = null;
 76         //如果要刪除的節點本身就是最後一個節點的話,就可以直接返回true了,因為不影響小頂堆的結構
 77         if (s != i) {
 78             /*
 79             否則執行一次siftDown下溯過程,將最後一個節點的值重新插入到小頂堆中
 80             這其中會刪除i位置處的節點(siftDown方法後面會再次調用,到時候再來詳細分析該方法的實現)
 81              */
 82             siftDown(i, replacement);
 83             /*
 84             經過上面的siftDown的操作後,如果最後一個節點的延遲時間本身就比要刪除的節點的小的話,
 85             那麼就會直接將最後一個節點放在要刪除節點的位置上。此時從刪除節點到其下面的節點都是滿足
 86             小頂堆結構的,但是不能保證replacement也就是當前刪除後的替換節點和其父節點之間滿足小頂堆
 87             結構,也就是說可能出現replacement節點的延遲時間比其父節點的還小的情況
 88              */
 89             if (queue[i] == replacement)
 90                 //那麼此時就調用一次siftUp上溯操作,再次調整replacement節點其上的小頂堆的結構即可
 91                 siftUp(i, replacement);
 92         }
 93         return true;
 94     } finally {
 95         //釋放鎖
 96         lock.unlock();
 97     }
 98 }
 99
100 /**
101  * 第63行代碼處:
102  */
103 private int indexOf(Object x) {
104     if (x != null) {
105         if (x instanceof ScheduledFutureTask) {
106             //如果當前節點是ScheduledFutureTask類型的,就獲取它的堆索引位
107             int i = ((ScheduledFutureTask) x).heapIndex;
108             //大於等於0和小於size說明當前節點還在小頂堆中,並且當前節點還在延遲隊列中的話,就直接返回該索引位
109             if (i >= 0 && i < size && queue[i] == x)
110                 return i;
111         } else {
112             //否則就按照普通遍歷的方式查找是否有相等的節點,如果有的話就返回索引位
113             for (int i = 0; i < size; i++)
114                 if (x.equals(queue[i]))
115                     return i;
116         }
117     }
118     //找不到的話就返回-1
119     return -1;
120 }
121
122 /**
123  * 第22行代碼處:
124  */
125 public boolean cancel(boolean mayInterruptIfRunning) {
126     //調用FutureTask的cancel方法來嘗試取消此任務的執行
127     boolean cancelled = super.cancel(mayInterruptIfRunning);
128     //如果取消成功了,並且允許刪除節點,並且當前節點存在於小頂堆中的話,就刪除它
129     if (cancelled && removeOnCancel && heapIndex >= 0)
130         remove(this);
131     return cancelled;
132 }
133
134 /**
135  * ThreadPoolExecutor:
136  * 第25行代碼處:
137  */
138 void ensurePrestart() {
139     //獲取當前線程池的工作線程數
140     int wc = workerCountOf(ctl.get());
141     if (wc < corePoolSize)
142         /*
143         如果小於核心線程數,就添加一個核心線程,之前我在分析ThreadPoolExecutor的源碼文章中講過,
144         addWorker方法的執行中會同時啟動運行線程。這裡傳入的firstTask參數為null,因為不需要立即執行任務,
145         而是從延遲隊列中拿取任務
146          */
147         addWorker(null, true);
148     else if (wc == 0)
149         //如果當前沒有工作線程,就去添加一個非核心線程,然後運行它。保證至少要有一個線程
150         addWorker(null, false);
151     /*
152     從這裡可以看出,如果當前的工作線程數已經達到了核心線程數後,就不會再創建工作線程了
153     定時線程池最多只有「核心線程數」個線程,也就是通過構造器傳進來的參數大小
154      */
155 }

6 添加任務

因為延遲隊列是用小頂堆構建的,所以添加的時候會涉及到小頂堆的調整:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * 這裡會調用DelayedWorkQueue的add方法
  4  */
  5 public boolean add(Runnable e) {
  6     return offer(e);
  7 }
  8
  9 public boolean offer(Runnable x) {
 10     //非空校驗
 11     if (x == null)
 12         throw new NullPointerException();
 13     //強轉類型
 14     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
 15     final ReentrantLock lock = this.lock;
 16     //加鎖
 17     lock.lock();
 18     try {
 19         //獲取當前的任務數量
 20         int i = size;
 21         //判斷是否需要擴容(初始容量為16)
 22         if (i >= queue.length)
 23             grow();
 24         //size+1
 25         size = i + 1;
 26         if (i == 0) {
 27             //如果當前是第一個任務的話,就直接放在小頂堆的根節點位置處就行了(隊列第一個位置)
 28             queue[0] = e;
 29             //同時設置一下當前節點的堆索引位為0
 30             setIndex(e, 0);
 31         } else {
 32             //否則就用siftUp的方式來插入到應該插入的位置
 33             siftUp(i, e);
 34         }
 35         //經過上面的插入過程之後,如果小頂堆的根節點還是當前新添加節點的話,說明新添加節點的延遲時間是最短的
 36         if (queue[0] == e) {
 37             //那麼此時不管有沒有leader線程,都得將其置為null
 38             leader = null;
 39             /*
 40             並且重新將條件隊列上的一個節點轉移到CLH隊列中(如果當前只有一個節點的時候也會進入到signal方法中
 41             但無妨,因為此時條件隊列中還沒有節點,所以並不會做什麼)需要提一點的是:如果真的看過signal方法內部實現
 42             的話就會知道,signal方法在常規情況下並不是在做喚醒線程的工作,喚醒是在下面的unlock方法中實現的
 43              */
 44             available.signal();
 45         }
 46     } finally {
 47         /*
 48         釋放鎖(注意,這裡只會喚醒CLH隊列中的head節點的下一個節點,可能是上面被鎖住的添加任務的其他線程、
 49         也可能是上次執行完任務後準備再次拿取任務的線程,還有可能是等待被喚醒的follower線程,又或者有其他的
 50         情況。但不管是哪個,只要能保證喚醒動作是一直能被傳播下去的就行。ReentrantLock和阻塞隊列的執行細節
 51         詳見我之前對AQS源碼進行分析的文章)
 52          */
 53         lock.unlock();
 54     }
 55     return true;
 56 }
 57
 58 /**
 59  * 第23行代碼處:
 60  */
 61 private void grow() {
 62     int oldCapacity = queue.length;
 63     //可以看到這裡的擴容策略是*1.5的方式
 64     int newCapacity = oldCapacity + (oldCapacity >> 1);
 65     //如果擴容後的新容量溢出了,就將其恢復為int的最大值
 66     if (newCapacity < 0)
 67         newCapacity = Integer.MAX_VALUE;
 68     //使用Arrays.copyOf(System.arraycopy)的方式來進行數組的拷貝
 69     queue = Arrays.copyOf(queue, newCapacity);
 70 }
 71
 72 /**
 73  * 第30行、第99行和第109行代碼處:
 74  * 設置f節點在小頂堆中的索引位為idx,這樣在最後的刪除節點時可以通過index是否大於0來判斷當前節點是否仍在小頂堆中
 75  */
 76 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
 77     if (f instanceof ScheduledFutureTask)
 78         ((ScheduledFutureTask) f).heapIndex = idx;
 79 }
 80
 81 /**
 82  * 第33行代碼處:
 83  * 堆排序的精髓就在於siftUp和siftDown方法,但本實現與常規的實現略有不同,多了一個入參key
 84  * key代表當前要插入節點中的任務
 85  */
 86 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 87     //當k<=0的時候說明已經上溯到根節點了
 88     while (k > 0) {
 89         //獲取父節點的索引((當前節點索引位-1)/2的方式)
 90         int parent = (k - 1) >>> 1;
 91         //獲取父節點的任務
 92         RunnableScheduledFuture<?> e = queue[parent];
 93         //如果當前要插入節點中的任務延遲時間大於父節點的延遲時間的話,就停止上溯過程,說明找到了插入的位置
 94         if (key.compareTo(e) >= 0)
 95             break;
 96         //否則就需要將父節點的內容賦值給當前節點
 97         queue[k] = e;
 98         //同時設置一下父節點的堆索引位為當前節點處
 99         setIndex(e, k);
100         //然後將父節點賦值給當前節點,繼續下一次的上溯過程
101         k = parent;
102     }
103     /*
104     走到這裡說明有兩種情況:<1>已經結束了上溯的過程,但最後一次的父節點還沒有賦值,這裡就是進行賦值的操作;
105     <2>如果本方法進來的時候要添加的最後一個節點本身就滿足小頂堆條件的話,那麼該處就是在給最後一個節點進行賦值
106      */
107     queue[k] = key;
108     //同時設置一下要插入節點的堆索引位
109     setIndex(key, k);
110 }
111
112 /**
113  * 第94行代碼處:
114  */
115 public int compareTo(Delayed other) {
116     //如果比較的就是當前對象,就直接返回0相等
117     if (other == this)
118         return 0;
119     if (other instanceof ScheduledFutureTask) {
120         //如果需要比較的任務也是ScheduledFutureTask類型的話,就首先強轉一下類型
121         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
122         //計算當前任務和需要比較的任務之間的延遲時間差
123         long diff = time - x.time;
124         if (diff < 0)
125             //小於0說明當前任務的延遲時間更短,就返回-1
126             return -1;
127         else if (diff > 0)
128             //大於0說明需要比較的任務的延遲時間更短,就返回1
129             return 1;
130         //如果兩者相等的話,就比較序列號,誰的序列號更小(序列號是唯一的),就應該先被執行
131         else if (sequenceNumber < x.sequenceNumber)
132             return -1;
133         else
134             return 1;
135     }
136     //如果需要比較的任務不是ScheduledFutureTask類型的話,就通過getDelay的方式來進行比較
137     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
138     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
139 }

7 拿取任務

在上面的ensurePrestart方法中會調用到addWorker方法,以此來補充工作線程。之前我對ThreadPoolExecutor源碼進行分析的文章中說到過,addWorker方法會調用到getTask方法來從隊列中拿取任務:

  1 /**
  2  * ThreadPoolExecutor:
  3  */
  4 private Runnable getTask() {
  5     //...
  6     /*
  7     這裡的allowCoreThreadTimeOut默認為false(為true表示空閑的核心線程也是要超時銷毀的),
  8     而上面說過定時線程池最多只有「核心線程數」個線程,所以timed為false
  9      */
 10     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 11     //...
 12     //因為timed為false,所以這裡會走take方法中的邏輯
 13     Runnable r = timed ?
 14             workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 15             workQueue.take();
 16     //...
 17 }
 18
 19 /**
 20  * ScheduledThreadPoolExecutor:
 21  * 第15行代碼處:
 22  * 上面的take方法會調用到DelayedWorkQueue的take方法,而該方法也就是用來實現延遲拿取任務的
 23  */
 24 public RunnableScheduledFuture<?> take() throws InterruptedException {
 25     final ReentrantLock lock = this.lock;
 26     //加鎖(響應中斷模式)
 27     lock.lockInterruptibly();
 28     try {
 29         for (; ; ) {
 30             //獲取隊頭節點
 31             RunnableScheduledFuture<?> first = queue[0];
 32             if (first == null)
 33                 /*
 34                 如果當前延遲隊列中沒有延遲任務,就在這裡阻塞當前線程(通過AQS中條件隊列的方式),等待有任務時被喚醒
 35                 另外,當線程執行完任務後也會再次走到getTask方法中的本方法中。如果此時沒任務了,就會在此被阻塞休眠住
 36                 (我在之前AQS源碼分析的文章中說過:await方法中會釋放掉所有的ReentrantLock鎖資源,然後才會被阻塞住)
 37                  */
 38                 available.await();
 39             else {
 40                 //否則就獲取隊頭的剩餘延遲時間
 41                 long delay = first.getDelay(NANOSECONDS);
 42                 //如果延遲時間已經到了的話,就刪除並返回隊頭,表示拿取到了任務
 43                 if (delay <= 0)
 44                     return finishPoll(first);
 45                 /*
 46                 這裡將隊頭節點的引用置為null,如果不置為null的話,可能有多個等待着的線程同時持有着隊頭節點的
 47                 first引用,這樣如果要刪除隊頭節點的話,因為其還有其他線程的引用,所以不能被及時回收,造成內存泄漏
 48                  */
 49                 first = null;
 50                 /*
 51                 如果leader不為null,說明有其他的線程已經成為了leader線程,正在延遲等待着
 52                 同時此時沒有新的延遲時間最短的節點進入到延遲隊列中
 53                  */
 54                 if (leader != null)
 55                     /*
 56                     那麼當前線程就變成了follower線程,需要被阻塞住,等待被喚醒(同上,其中會釋放掉所有的鎖資源)
 57                     線程執行完任務後也會再次走到本方法中拿取任務,如果走到這裡發現已經有別的leader線程了,
 58                     那麼當前線程也會被阻塞休眠住;否則就會在下面的else分支中再次成為leader線程
 59                      */
 60                     available.await();
 61                 else {
 62                     /*
 63                     leader為null,可能是上一個leader線程拿取到任務後喚醒的下一個線程,也有可能
 64                     是一個新的延遲時間最短的節點進入到延遲隊列中,從而將leader置為null
 65 
 66                     此時獲取當前線程
 67                      */
 68                     Thread thisThread = Thread.currentThread();
 69                     //並將leader置為當前線程,也就是當前線程成為了leader線程
 70                     leader = thisThread;
 71                     try {
 72                         /*
 73                         這裡也就是在做具體的延時等待delay納秒的操作了,具體涉及到AQS中條件隊列的相關操作
 74                         如果被喚醒的話可能是因為到達了延遲時間從而醒來;也有可能是被別的線程signal喚醒了;
 75                         還有可能是中斷被喚醒。正常情況下是等到達了延遲時間後,這裡會醒來並進入到下一次循環中的
 76                         finishPoll方法中,剔除隊頭節點並最終返回(awaitNanos方法和await方法類似,其中會釋放掉
 77                         所有的鎖資源;不一樣的是在被喚醒時會把當前節點從條件隊列中「轉移」到CLH隊列中。這裡可以認為
 78                         是轉移,因為在條件隊列中的該節點狀態已經改為了0,相當於是個垃圾節點,後續會進行刪除)
 79                          */
 80                         available.awaitNanos(delay);
 81                     } finally {
 82                         /*
 83                         不管awaitNanos是如何被喚醒的,此時會判斷當前的leader線程是否還是當前線程
 84                         如果是的話就將leader置為null,也就是當前線程不再是leader線程了
 85                          */
 86                         if (leader == thisThread)
 87                             leader = null;
 88                     }
 89                 }
 90             }
 91         }
 92     } finally {
 93         //在退出本方法之前,判斷如果leader線程為null並且刪除隊頭後的延遲隊列仍然不為空的話(說明此時有其他的延遲任務)
 94         if (leader == null && queue[0] != null)
 95             //就將條件隊列上的一個節點轉移到CLH隊列中(同時會剔除上面的垃圾條件節點)
 96             available.signal();
 97         /*
 98         釋放鎖(同offer方法中的邏輯,這裡只會喚醒CLH隊列中的head節點的下一個節點。這裡就體現了
 99         Leader-Follower模式:當leader線程拿取到任務後準備要執行時,會首先喚醒剩下線程中的一個,
100         它將會成為新的leader線程,並以此往複。保證在任何時間都只有一個leader線程,避免不必要的喚醒與睡眠)
101          */
102         lock.unlock();
103     }
104 }
105
106 /**
107  * 第44行代碼處:
108  */
109 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
110     //size-1
111     int s = --size;
112     //獲取隊列中的最後一個節點
113     RunnableScheduledFuture<?> x = queue[s];
114     //並置空它,便於GC,這裡也就是在刪除最後一個節點
115     queue[s] = null;
116     //如果刪除前延遲隊列中有不止一個節點的話,就進入到siftDown方法中,將小頂堆中的根節點刪除,並且重新維護小頂堆
117     if (s != 0)
118         siftDown(0, x);
119     //同時設置一下刪除前的根節點的堆索引位為-1,表示其不存在於小頂堆中了
120     setIndex(f, -1);
121     //最後將其返回出去
122     return f;
123 }
124
125 /**
126  * 第118行代碼處:
127  * 方法參數中的key代表刪除的最後一個節點中的任務
128  */
129 private void siftDown(int k, RunnableScheduledFuture<?> key) {
130     /*
131     這裡會取數組長度的一半half(注意這裡的size是已經刪除最後一個節點後的size),
132     而half也就是在指向最後一個非葉子節點的下一個節點
133      */
134     int half = size >>> 1;
135     //從這裡可以看出下溯的終止條件是k大於等於half,也就是此時遍歷到已經沒有了非葉子節點,自然不需要進行調整
136     while (k < half) {
137         //獲取左孩子節點的索引位
138         int child = (k << 1) + 1;
139         //獲取左孩子節點的任務
140         RunnableScheduledFuture<?> c = queue[child];
141         //獲取右孩子節點的索引位
142         int right = child + 1;
143         //如果右孩子節點的索引位小於size,也就是在說當前節點含有右子樹。並且左孩子節點的任務延遲時間大於右孩子節點的話
144         if (right < size && c.compareTo(queue[right]) > 0)
145             //就將c重新指向為右孩子節點
146             c = queue[child = right];
147         /*
148         走到這裡說明c指向的是左右子節點中、任務延遲時間較小的那個節點。此時判斷如果最後一個節點的
149         任務延遲時間小於等於這個較小節點的話,就可以停止下溯了,說明找到了插入的位置
150          */
151         if (key.compareTo(c) <= 0)
152             break;
153         //否則就把較小的那個節點賦值給當前節點處
154         queue[k] = c;
155         //同時設置一下延遲時間較小的那個節點的堆索引位為當前節點處
156         setIndex(c, k);
157         //然後將當前節點指向那個較小的節點,繼續下一次循環
158         k = child;
159     }
160     /*
161     同siftUp方法一樣,走到這裡說明有兩種情況:<1>已經結束了下溯的過程,但最後一次的子節點還沒有賦值,
162     這裡會把其賦值為之前刪除的最後一個節點;
163     <2>如果根節點的左右子節點中、任務延遲時間較小的那個節點本身的延遲時間就比之前刪除節點大的話,
164     就會把根節點替換為之前刪除的最後一個節點
165     所以本方法加上finishPoll方法,實際上並沒有將最後一個節點刪除,最後一個節點中的任務一直都是保留着的
166     (也就是key),而是變相地將堆的根節點刪除了(在第一種情況中根節點在第一次賦值為左右子節點中、
167     任務延遲時間較小的那個節點時,就已經被覆蓋了)
168      */
169     queue[k] = key;
170     //同時設置一下最後一個節點現在新的堆索引位
171     setIndex(key, k);
172 }

8 執行延遲任務

拿取到任務之後,就是具體的執行任務了。addWorker方法具體的執行邏輯我在之前ThreadPoolExecutor的源碼分析文章中已經講過了,其中執行任務的時候會調用task的run方法,也就是這裡包裝為ScheduledFutureTask的run方法:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public void run() {
 5     //判斷是否是周期性任務
 6     boolean periodic = isPeriodic();
 7     if (!canRunInCurrentRunState(periodic)) {
 8         //如果此時不能繼續執行任務的話,就嘗試取消此任務的執行
 9         cancel(false);
10     } else if (!periodic)
11         /*
12         如果是延遲任務,就調用ScheduledFutureTask父類FutureTask的run方法,
13         其中會通過call方法來最終調用到使用者具體寫的任務
14          */
15         ScheduledFutureTask.super.run();
16     else if (ScheduledFutureTask.super.runAndReset()) {
17         //周期性任務的執行放在下一節中進行分析
18         setNextRunTime();
19         reExecutePeriodic(outerTask);
20     }
21 }

9 scheduleAtFixedRate & scheduleWithFixedDelay方法

scheduleAtFixedRate方法是以上次的延遲時間點開始,延遲指定時間後再次執行當前任務;而scheduleWithFixedDelay方法是以上個周期任務執行完畢後的時間點開始,延遲指定時間後再次執行當前任務。因為這兩個方法的實現絕大部分都是一樣的,所以合在一起來進行分析:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * scheduleAtFixedRate方法
  4  */
  5 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  6                                               long initialDelay,
  7                                               long period,
  8                                               TimeUnit unit) {
  9     //非空校驗
 10     if (command == null || unit == null)
 11         throw new NullPointerException();
 12     //非負校驗
 13     if (period <= 0)
 14         throw new IllegalArgumentException();
 15     //包裝任務
 16     ScheduledFutureTask<Void> sft =
 17             new ScheduledFutureTask<Void>(command,
 18                     null,
 19                     triggerTime(initialDelay, unit),
 20                     unit.toNanos(period));
 21     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 22     //把任務賦值給ScheduledFutureTask的outerTask屬性
 23     sft.outerTask = t;
 24     //延遲執行
 25     delayedExecute(t);
 26     return t;
 27 }
 28
 29 /**
 30  * scheduleWithFixedDelay方法
 31  */
 32 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 33                                                  long initialDelay,
 34                                                  long delay,
 35                                                  TimeUnit unit) {
 36     //非空校驗
 37     if (command == null || unit == null)
 38         throw new NullPointerException();
 39     //非負校驗
 40     if (delay <= 0)
 41         throw new IllegalArgumentException();
 42     //包裝任務
 43     ScheduledFutureTask<Void> sft =
 44             new ScheduledFutureTask<Void>(command,
 45                     null,
 46                     triggerTime(initialDelay, unit),
 47                     unit.toNanos(-delay));
 48     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 49     //把任務賦值給ScheduledFutureTask的outerTask屬性
 50     sft.outerTask = t;
 51     //延遲執行
 52     delayedExecute(t);
 53     return t;
 54 }
 55
 56 /**
 57  * 第17行和第44行代碼處:
 58  */
 59 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 60     super(r, result);
 61     this.time = ns;
 62     /*
 63     可以看到這裡與schedule方法中調用ScheduledFutureTask構造器的區別是多了一個period入參
 64     在schedule方法中this.period賦值為0,而這裡會賦值為周期時間。其他的代碼都是一樣的
 65     如果細心的話可以看出:在上面scheduleAtFixedRate方法傳入的period是一個大於0的數,而
 66     scheduleWithFixedDelay方法傳入的period是一個小於0的數,以此來進行區分
 67      */
 68     this.period = period;
 69     this.sequenceNumber = sequencer.getAndIncrement();
 70 }

10 執行周期性任務

周期性任務和延遲任務的拿取任務邏輯都是一樣的,而在下面具體運行任務時有所不同,下面就來看一下其實現的差異:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */
 4 public void run() {
 5     boolean periodic = isPeriodic();
 6     if (!canRunInCurrentRunState(periodic))
 7         cancel(false);
 8     else if (!periodic)
 9         ScheduledFutureTask.super.run();
10     /*
11     前面都是之前分析過的,而周期性任務會走下面的分支中
12
13     FutureTask的runAndReset方法相比於run方法來說,區別在於可以重複計算(run方法不能復用)
14     因為runAndReset方法在計算完成後不會修改狀態,狀態一直都是NEW
15      */
16     else if (ScheduledFutureTask.super.runAndReset()) {
17         //設置下次的運行時間點
18         setNextRunTime();
19         //重新添加任務
20         reExecutePeriodic(outerTask);
21     }
22 }
23
24 /**
25  * 第18行代碼處:
26  */
27 private void setNextRunTime() {
28     /*
29     這裡會獲取period,也就是之前設置的周期時間。上面說過,通過period的正負就可以區分出到底調用的是
30     scheduleAtFixedRate方法還是scheduleWithFixedDelay方法
31      */
32     long p = period;
33     if (p > 0)
34         /*
35         如果調用的是scheduleAtFixedRate方法,下一次的周期任務時間點就是起始的延遲時間加上周期時間,需要注意的是:
36         如果任務執行的時間大於周期時間period的話,那麼定時線程池就不會按照原先設計的延遲時間進行執行,而是會按照近似於
37         任務執行的時間來作為延遲的間隔(不管核心線程有多少個都是如此,因為任務是放在延遲隊列中的、是線性執行的)
38          */
39         time += p;
40     else
41         /*
42         triggerTime方法之前分析過是獲取當前時間+延遲時間後的結果,而此時是在執行完任務後,也就是說:
43         如果調用的是scheduleWithFixedDelay方法,下一次的周期任務時間點就是執行完上次任務後的時間點加上周期時間
44         由此可以看出,scheduleAtFixedRate方法和scheduleWithFixedDelay方法的區別就在於下一次time設置的不同而已
45          */
46         time = triggerTime(-p);
47     //time屬性會記錄到節點中,在小頂堆中通過compareTo方法來進行排序
48 }
49
50 /**
51  * 第20行代碼處:
52  */
53 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
54     //判斷此時是否還能繼續執行任務
55     if (canRunInCurrentRunState(true)) {
56         /*
57         這裡也就是重新往延遲隊列中添加任務,以此達到周期執行的效果。添加之後在getTask方法中的take方法中
58         就又可以拿到這個任務。設置下次的執行時間,然後再添加任務...周而復始
59          */
60         super.getQueue().add(task);
61         //添加後繼續判斷此時是否還能繼續執行任務,如果不能的話就刪除上面添加的任務
62         if (!canRunInCurrentRunState(true) && remove(task))
63             //同時會取消此任務的執行
64             task.cancel(false);
65         else
66             //否則,說明線程池是可以繼續執行任務的,就去判斷此時是否需要補充工作線程
67             ensurePrestart();
68     }
69 }

注意:網上的一種說法是:scheduleAtFixedRate方法是以上一個任務開始的時間計時,period時間過去後,檢測上一個任務是否執行完畢。如果上一個任務執行完畢,則當前任務立即執行;如果上一個任務沒有執行完畢,則需要等上一個任務執行完畢後立即執行。實際上這種說法是錯誤的,儘管它的表象是對的。正確的說法是:如果任務的執行時間小於周期時間的話,則會以上次任務執行開始時間加上周期時間後,再去執行下一次任務;而如果任務的執行時間大於周期時間的話,則會等到上次任務執行完畢後立即(近似於)執行下次任務。這兩種說法的區別就在於任務的執行時間大於周期時間的時候,檢測上一個任務是否完畢的時機不同。實際上在period時間過去後,根本不會有任何的檢測機制。因為只有等上次任務執行完畢後才會往延遲隊列中添加下一次任務,從而觸發各種後續的動作。所以在period時間點時,當前線程還在執行任務中,而其他的線程因為延遲隊列中為空會處於休眠的狀態(假如就只有一個周期任務的話)。所以根本不會有所謂的「檢測」的說法,這種說法也只能說是想當然了。還是那句話:「Talk is cheap. Show me the code.」

既然都說到這裡了,那麼現在就想來嘗試分析一下如果任務的執行時間大於周期時間的話,具體是怎樣的一個執行流程?

為了便於分析,假設現在是只有一個周期任務的場景,那麼延遲隊列中的任務數量最多就只會有1個:拿取到任務,延遲隊列中就變為空。執行完任務的時候,就又會往隊列中放一個任務。這樣其他搶不到任務的線程就會被休眠住。而添加任務的時候因為每次重新添加的任務都是小頂堆的根節點(從無到有),即添加的這個任務就是此時延遲時間最短的任務,所以同時會觸發嘗試喚醒線程的動作。

同時在添加下一個任務前會修改下一次的時間點。在setNextRunTime方法中,scheduleAtFixedRate方法是以上一次的延遲時間點加上周期時間來作為下一次的延遲時間點的,並不是scheduleWithFixedDelay方法獲取當前時間加上周期時間的方式。在當前這種情況下周期時間是要小於任務的執行時間的,也就是說會造成下一次的延遲時間點會賦值為一個已經過期的時間。且隨着周期的增加,下一次的延遲時間點會離當前時間點越來越遠。既然下一次的延遲時間點已經過期了,那麼就會去立馬執行任務。

所以總結一下:需要被喚醒的線程和上次執行完任務的線程就會去爭搶鎖資源(喚醒線程會把當前節點放進CLH隊列中,上次執行完任務的線程也會再次走到lockInterruptibly方法中(在它重新放任務的時候也會經歷一次lock),同時因為是ReentrantLock非公平鎖,這樣在調用unlock解鎖時就會出現在CLH隊列上的搶資源現象了),搶到的就會立馬去執行下一次的周期任務,而不會有任何的延時,造成的表象就是會以一個近似於任務執行時間為間隔的周期來執行任務。

11 shutdown方法

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  * 可以看到,定時線程池的shutdown方法是使用的父類ThreadPoolExecutor的shutdown方法,
 4  * 而該方法我在之前的ThreadPoolExecutor的源碼分析文章中已經分析過了。但是其中會調用
 5  * onShutdown的鉤子方法,也就是在ScheduledThreadPoolExecutor中的實現
 6  */
 7 public void shutdown() {
 8     super.shutdown();
 9 }
10
11 @Override
12 void onShutdown() {
13     //獲取延遲隊列
14     BlockingQueue<Runnable> q = super.getQueue();
15     //關閉線程池時判斷是否需要繼續執行延遲任務
16     boolean keepDelayed =
17             getExecuteExistingDelayedTasksAfterShutdownPolicy();
18     //關閉線程池時判斷是否需要繼續執行周期性任務
19     boolean keepPeriodic =
20             getContinueExistingPeriodicTasksAfterShutdownPolicy();
21     if (!keepDelayed && !keepPeriodic) {
22         //如果都不需要的話,就將延遲隊列中的任務逐個取消(並刪除)
23         for (Object e : q.toArray())
24             if (e instanceof RunnableScheduledFuture<?>)
25                 ((RunnableScheduledFuture<?>) e).cancel(false);
26         //最後做清理工作
27         q.clear();
28     } else {
29         for (Object e : q.toArray()) {
30             if (e instanceof RunnableScheduledFuture) {
31                 //否則就判斷如果任務是RunnableScheduledFuture類型的,就強轉一下類型
32                 RunnableScheduledFuture<?> t =
33                         (RunnableScheduledFuture<?>) e;
34                 //如果關閉線程池時不需要繼續執行任務,又或者需要繼續執行但是任務已經取消了
35                 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
36                         t.isCancelled()) {
37                     //就刪除當前節點
38                     if (q.remove(t))
39                         //同時取消任務
40                         t.cancel(false);
41                 }
42             }
43         }
44     }
45     //根據線程池狀態來判斷是否應該結束線程池
46     tryTerminate();
47 }
48
49 /**
50  * 第27行代碼處:
51  */
52 public void clear() {
53     final ReentrantLock lock = this.lock;
54     //加鎖
55     lock.lock();
56     try {
57         for (int i = 0; i < size; i++) {
58             //遍歷獲得延遲隊列中的每一個節點
59             RunnableScheduledFuture<?> t = queue[i];
60             if (t != null) {
61                 //將節點置為null
62                 queue[i] = null;
63                 //同時將索引位置為-1(recheck)
64                 setIndex(t, -1);
65             }
66         }
67         //size賦為初始值0
68         size = 0;
69     } finally {
70         //釋放鎖
71         lock.unlock();
72     }
73 }

更多內容請關注微信公眾號:奇客時間

Tags: