quartz框架(六)-ThreadPool
- 2022 年 3 月 12 日
- 筆記
ThreadPool
本篇博文,部落客將介紹Quartz框架中ThreadPool執行緒池相關的內容。執行緒池顧名思義,就是一個可以幫助我們來進行執行緒資源管理的對象。在web開發中,常見的就有資料庫連接池,http連接池,redis連接池等。在看這篇文章之前,讀者需要先具備一定的多執行緒和鎖的知識,如使用wait和notify方法,實現生產者和消費者功能。
為什麼要用執行緒池?
- 執行緒池可以復用執行緒,減少執行緒的創建和銷毀次數。
- 可以提高程式的響應速度。
- 可以對執行緒資源進行統一管理,比如監控。
- …
介面定義
quartz框架中的ThreadPool介面定義如下,部落客在相應的介面方法上進行了注釋。
public interface ThreadPool {
//將runnable介面放入到Thread中執行
boolean runInThread(Runnable runnable);
//阻塞獲取可用執行緒數
int blockForAvailableThreads();
//執行緒初始化方法
void initialize() throws SchedulerConfigException;
//關閉執行緒
void shutdown(boolean waitForJobsToComplete);
//獲取執行緒池大小
int getPoolSize();
//設置實例id
void setInstanceId(String schedInstId);
//設置實例名稱
void setInstanceName(String schedName);
}
WorkThread
在介紹SimpleThreadPool之前,部落客先講解一下WorkThread。WorkThread繼承了Thread類,因此它是一個可以被運行的執行緒。它會阻塞式的接收執行緒池分配的任務,然後執行對應的任務。
工作者執行緒屬性
- lock,任務執行鎖。
- tp,工作者執行緒所在的執行緒池。
- runnable,可以運行的任務。
- runOnce,是否只執行一次。通過構造函數傳runnable實例,此時runOnce為true。
- run,是否需要循環執行。
Runnable的run方法
工作者執行緒啟動之後,在一個while循環裡面執行業務邏輯。while循環的退出條件是執行緒是否關閉(run == false)。先去獲取任務鎖(lock對象),如果沒有關閉且當前runnale方法為空,說明此時沒有需要執行的任務。執行緒會在這裡等待500毫秒(wait 500)。當有任務投遞給當前執行緒時,才會喚醒工作者執行緒,繼續執行當前方法。
如果任務不為空的話,執行runnable介面。執行完之後獲取lock鎖,防止並發衝突。獲取到鎖之後,設置runnable介面為null。如果只是只執行一次的任務,通過持有的threadPool引用,去獲取下一次任務允許的鎖,獲取到下一次任務允許的鎖之後,那麼就將自己從busyWorks中移除。否則就從busyWorkers中移除,然後添加到avaliableWorkers。
自定義的run方法
內部的run(Runnable newRunnable)方法:先去獲取內部對象lock的鎖,獲取成功後先判斷內部的runnable是否為空。如果不為空的話說明該執行緒還是處於繁忙狀態,拋出異常。如果為空的話,則設置為傳遞進來的newRunnable,並且喚醒所有lock等待隊列中的對象(提前讓這些對象結束等待,提高工作執行緒的響應速度)。
shutdown方法
設置內部成員變數run為false,也就是任務執行完之後不再循環等待下一個可以運行的任務,結束執行緒的run方法後,執行緒會進入terminate狀態。
SimpleThreadPool
在quartz默認的配置文件中,使用的是SimpleThreadPool這個執行緒池,並且指定了執行緒池的個數為10。從源碼中,我們也可以看到SimpleThreadPool是一個執行緒大小固定的執行緒池。程式碼如下所示:
public SimpleThreadPool(int threadCount, int threadPriority) {
setThreadCount(threadCount);
setThreadPriority(threadPriority);
}
執行緒池屬性
- count,執行緒個數
- isShutdown,是否處於關閉狀態
- handoffPending,是否處於切換狀態
- makeThreadsDaemons,創建的執行緒是否為後台執行緒
- threadGroup,執行緒組
- nextRunnableLock,下一個可以運行的任務鎖
- workers,總共執行緒集合
- availWorkers,空閑執行緒集合
- busyWorkers,繁忙執行緒集合
- threadNamePrefix,執行緒名稱前綴
- schedulerInstanceName,調度器實例名稱
initialize方法
在QuartzScheduler對象進行初始化的時候,就會創建對應的執行緒池,並且調用對應的initialize方法。initialize方法主要就是預先創建對應個數的工作者執行緒,並且將它們添加到workers和availWorkers集合中,並且循環調用每個workThread的start方法。
blockForAvailableThreads方法
先獲取到下一次運行的任務鎖,防止這時候的空閑執行緒集合availWorkers發生變化。如果此時的availWorkers小於1,或者此時有任務進行等待分配,並且此時執行緒池沒有關閉,那麼就進行等待(wait 500)。
如果此時獲取到的空閑執行緒數大於等於1,則說明現在可以把對應個數的任務交給執行緒池進行分配執行。
runInThread方法
先獲取到可以允許下一個任務的鎖(nextRunnableLock),設置handoffPending為true,handoffPending表示當前有任務在等待執行緒池分配任務。接著阻塞判斷是否存在空閑執行緒可以獲取(繁忙執行緒執行完後會將自己添加到空閑執行緒集合中)或者 執行緒池是否需要被關閉。
如果沒有關閉執行緒池的情況下,直接從空閑執行緒中拿到第一個執行緒,並從空閑集合中移除。然後將這個空閑執行緒添加到繁忙執行緒集合中,接著執行workThread的投遞方法(run方法)。
如果將要關閉執行緒池的情況下,直接new出一個執行緒去執行這個任務,不再等待有空閑執行緒去執行,加快執行緒池的shutdown時間。並將此執行緒添加到繁忙執行緒集合中,添加到工作者集合中。
最後通知等待下一次允許任務鎖的執行緒,設置handoffPending為false。
shutdown
關閉執行緒池時,一個需要停止上游執行緒(quartzThread)給他(thradPool)分配任務,另一個需要關閉掉工作池中現有的任務。如果這時候剛好有任務需要進行調度,則需要看配置waitForJobsToComplete。
關閉執行緒池的方法有一個waitForJobsToComplete的方法,waitForJobsToComplete表示執行緒池關閉是否需要等到運行中的任務執行完畢。接著設置執行緒池為關閉狀態(shutdown),並循環調用workers集合中thread的shutdown方法(不讓工作者執行緒再循環執行),然後移除availWorkers中的執行緒。
如果waitForJobsToComplete為true,那麼判斷busyWorkers的元素個數是否大於0(繁忙集合中的執行緒結束任務後,會將自己從繁忙集合中移除),如果busyWorkers的元素個數大於0的,調用nextRunnableLock的阻塞方法,讓其它方法有時間處理(比如如果此時有任務需要進行分配,可以讓執行緒池把任務分配好)。最後循環調用每個workers中thread的join方法,等待thread死亡。