quartz框架(六)-ThreadPool

  • 2022 年 3 月 12 日
  • 筆記

ThreadPool

本篇博文,部落客將介紹Quartz框架中ThreadPool執行緒池相關的內容。執行緒池顧名思義,就是一個可以幫助我們來進行執行緒資源管理的對象。在web開發中,常見的就有資料庫連接池,http連接池,redis連接池等。在看這篇文章之前,讀者需要先具備一定的多執行緒和鎖的知識,如使用wait和notify方法,實現生產者和消費者功能。

為什麼要用執行緒池?

  1. 執行緒池可以復用執行緒,減少執行緒的創建和銷毀次數。
  2. 可以提高程式的響應速度。
  3. 可以對執行緒資源進行統一管理,比如監控。

介面定義

quartz框架中的ThreadPool介面定義如下,部落客在相應的介面方法上進行了注釋。

public interface ThreadPool {

    //將runnable介面放入到Thread中執行
    boolean runInThread(Runnable runnable);
   
   //阻塞獲取可用執行緒數
    int blockForAvailableThreads();
   
   //執行緒初始化方法
    void initialize() throws SchedulerConfigException;
   
   //關閉執行緒
    void shutdown(boolean waitForJobsToComplete);
    
	//獲取執行緒池大小
    int getPoolSize();
   
   //設置實例id
    void setInstanceId(String schedInstId);
    
	//設置實例名稱
    void setInstanceName(String schedName);
}

WorkThread

在介紹SimpleThreadPool之前,部落客先講解一下WorkThread。WorkThread繼承了Thread類,因此它是一個可以被運行的執行緒。它會阻塞式的接收執行緒池分配的任務,然後執行對應的任務。

工作者執行緒屬性

  1. lock,任務執行鎖。
  2. tp,工作者執行緒所在的執行緒池。
  3. runnable,可以運行的任務。
  4. runOnce,是否只執行一次。通過構造函數傳runnable實例,此時runOnce為true。
  5. run,是否需要循環執行。

Runnable的run方法

工作者執行緒啟動之後,在一個while循環裡面執行業務邏輯。while循環的退出條件是執行緒是否關閉(run == false)。先去獲取任務鎖(lock對象),如果沒有關閉且當前runnale方法為空,說明此時沒有需要執行的任務。執行緒會在這裡等待500毫秒(wait 500)。當有任務投遞給當前執行緒時,才會喚醒工作者執行緒,繼續執行當前方法。

如果任務不為空的話,執行runnable介面。執行完之後獲取lock鎖,防止並發衝突。獲取到鎖之後,設置runnable介面為null。如果只是只執行一次的任務,通過持有的threadPool引用,去獲取下一次任務允許的鎖,獲取到下一次任務允許的鎖之後,那麼就將自己從busyWorks中移除。否則就從busyWorkers中移除,然後添加到avaliableWorkers。

自定義的run方法

內部的run(Runnable newRunnable)方法:先去獲取內部對象lock的鎖,獲取成功後先判斷內部的runnable是否為空。如果不為空的話說明該執行緒還是處於繁忙狀態,拋出異常。如果為空的話,則設置為傳遞進來的newRunnable,並且喚醒所有lock等待隊列中的對象(提前讓這些對象結束等待,提高工作執行緒的響應速度)。

shutdown方法

設置內部成員變數run為false,也就是任務執行完之後不再循環等待下一個可以運行的任務,結束執行緒的run方法後,執行緒會進入terminate狀態。

SimpleThreadPool

在quartz默認的配置文件中,使用的是SimpleThreadPool這個執行緒池,並且指定了執行緒池的個數為10。從源碼中,我們也可以看到SimpleThreadPool是一個執行緒大小固定的執行緒池。程式碼如下所示:

public SimpleThreadPool(int threadCount, int threadPriority) {
        setThreadCount(threadCount);
        setThreadPriority(threadPriority);
 }

執行緒池屬性

  1. count,執行緒個數
  2. isShutdown,是否處於關閉狀態
  3. handoffPending,是否處於切換狀態
  4. makeThreadsDaemons,創建的執行緒是否為後台執行緒
  5. threadGroup,執行緒組
  6. nextRunnableLock,下一個可以運行的任務鎖
  7. workers,總共執行緒集合
  8. availWorkers,空閑執行緒集合
  9. busyWorkers,繁忙執行緒集合
  10. threadNamePrefix,執行緒名稱前綴
  11. schedulerInstanceName,調度器實例名稱

initialize方法

在QuartzScheduler對象進行初始化的時候,就會創建對應的執行緒池,並且調用對應的initialize方法。initialize方法主要就是預先創建對應個數的工作者執行緒,並且將它們添加到workers和availWorkers集合中,並且循環調用每個workThread的start方法。

blockForAvailableThreads方法

先獲取到下一次運行的任務鎖,防止這時候的空閑執行緒集合availWorkers發生變化。如果此時的availWorkers小於1,或者此時有任務進行等待分配,並且此時執行緒池沒有關閉,那麼就進行等待(wait 500)。

如果此時獲取到的空閑執行緒數大於等於1,則說明現在可以把對應個數的任務交給執行緒池進行分配執行。

runInThread方法

先獲取到可以允許下一個任務的鎖(nextRunnableLock),設置handoffPending為true,handoffPending表示當前有任務在等待執行緒池分配任務。接著阻塞判斷是否存在空閑執行緒可以獲取(繁忙執行緒執行完後會將自己添加到空閑執行緒集合中)或者 執行緒池是否需要被關閉。

如果沒有關閉執行緒池的情況下,直接從空閑執行緒中拿到第一個執行緒,並從空閑集合中移除。然後將這個空閑執行緒添加到繁忙執行緒集合中,接著執行workThread的投遞方法(run方法)。

如果將要關閉執行緒池的情況下,直接new出一個執行緒去執行這個任務,不再等待有空閑執行緒去執行,加快執行緒池的shutdown時間。並將此執行緒添加到繁忙執行緒集合中,添加到工作者集合中。

最後通知等待下一次允許任務鎖的執行緒,設置handoffPending為false。

shutdown

關閉執行緒池時,一個需要停止上游執行緒(quartzThread)給他(thradPool)分配任務,另一個需要關閉掉工作池中現有的任務。如果這時候剛好有任務需要進行調度,則需要看配置waitForJobsToComplete。

關閉執行緒池的方法有一個waitForJobsToComplete的方法,waitForJobsToComplete表示執行緒池關閉是否需要等到運行中的任務執行完畢。接著設置執行緒池為關閉狀態(shutdown),並循環調用workers集合中thread的shutdown方法(不讓工作者執行緒再循環執行),然後移除availWorkers中的執行緒。

如果waitForJobsToComplete為true,那麼判斷busyWorkers的元素個數是否大於0(繁忙集合中的執行緒結束任務後,會將自己從繁忙集合中移除),如果busyWorkers的元素個數大於0的,調用nextRunnableLock的阻塞方法,讓其它方法有時間處理(比如如果此時有任務需要進行分配,可以讓執行緒池把任務分配好)。最後循環調用每個workers中thread的join方法,等待thread死亡。

部落客微信公眾號