quartz框架(七)-JobStore
JobStore
在之前的博文中,部落客已經寫了關於Job的相關內容。本篇博文,部落客將介紹JobStore相關的內容。
JobStore是存放Job和Trigger的地方。當我們調用Scheduler對象的scheduleJob時就會將其存入JobStore中,然後供quartzSchedulerThread使用。
為什麼需要JobStore?
因為我們需要被Scheduler調用的任務大多數並不是一次性的任務,而是需要被定時觸發,或者某個時間點才能被觸發的。因此我們需要一個容器來存儲Job和Trigger的相關內容。
其次,quartz框架還有考慮到持久化存儲的場景,比如說將對應的數據存放到資料庫,這時候存放或者讀取資料庫裡面的數據都需要有一個與之對應的容器。
介面定義
JobStore介面中定義的方法太多,這裡部落客只列出一下比較重要的方法。介面定義如下所示:
public interface JobStore {
//存儲job和tigger
void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger)
throws ObjectAlreadyExistsException, JobPersistenceException;
//獲取下一次需要進行觸發的觸發器
List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException;
//釋放獲取到的觸發器
void releaseAcquiredTrigger(OperableTrigger trigger);
//觸發器被觸發
List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException;
//觸發器觸發完成
void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, CompletedExecutionInstruction triggerInstCode);
}
RAMJobStore
在quartz的默認配置中使用的就是RAMJobStore。顧名思義,RAMJobStore是基於記憶體來存儲的Job相關數據,也就是在程式重啟之後,對應的數據就會消失,並且它不支援集群,也就是說它不可以把需要調度的任務分配到多台機器上面進行執行。部落客這裡就以RAMJobStore為例,講解一下RAMJobStore的相關實現。
RAMJobStore屬性
- jobsByKey,按jobKey進行分組的HashMap集合
- triggersByKey,按jobKey進行分組的HashMap集合
- jobsByGroup,按job的分組名進行分組的HashMap集合
- triggersByGroup,按trigger的分組名進行分組的HashMap集合
- timeTriggers,具有下次觸發時間的trigger集合
- triggers,trigger集合
- lock,操作RAMJobStore時需要獲取的鎖
- pausedTriggerGroups,被暫停的trigger分組名的HashSet集合
- pausedJobGroups,被暫停的job分組名的HashSet集合
- blockedJobs,被鎖住的任務key的HashSet集合
- misfireThreshold,失火閾值
- signaler,SchedulerSignaler訊號器
acquireNextTriggers方法
總體來說,quartzSchedulerThread有設置idlewaitime時間。idlewaitime時間就是在這個空閑時間內如果沒有接收到調度器發生變化的訊號(sigLock鎖的notify),它就會阻塞對應的時間(sigLock的wait(idlewaitime)方法)。因此quartzSchedulerThread需要提前獲取到這個(now + idlewaitime)時間點內的trigger,否則會造成觸發器失火的情況,並且它會一次性獲取當前可用空閑執行緒個數的trigger。
acquireNextTriggers方法邏輯:
- 先獲取到lock鎖,防止此時的job和trigger放生變化。
- 如果timeTriggers的集合為空,那麼直接返回。
- while循環獲取triggerWrapper,從timetriggers中獲取第一個triggerWrapper。因為timeTriggers是有序的triggerWrapper集合(按照觸發時間和優先順序排序),接著從timeTriggers中移除triggerWrapper。
- 如果trigger的下一次觸發時間為空,則重新獲取triggerWrapper。
- 如果trigger的下一次觸發時間大於需要獲取的時間點,則跳出循環(因為timeTrigger是有序的,第一個時間都不滿足了,就不用再繼續循環了),然後返回獲取到的trigger集合。
- 接著判斷該trigger是否失火,失火的條件為:當前時間小於這個(now-misfireTime)的時間點 並且失火策略為不能忽略(Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)。
a. 通知triggerListener該trigger發生失火
b. 接著調用trigger的updateAfterMisfire方法
c. 最後進行下一次觸發時間的判斷,如果下一次觸發時間為空,那麼就說明trigger已經完成,從timeTriggers中移除自己 - 如果處理後的trigger的下一次執行時間跟之前的執行時間不一樣的話,則需要重新加入timeTrigger計算觸發順序。
- 判斷該trigger對應的job是否禁止並發執行,如果禁止,則需要將該job對應的其它trigger暫時從timeTriggers中移除。
- 設置triggerWrapper的狀態為ACQUIRED,設置本次觸發的triggerInstanceId。
- 如果獲取到的trigger數量等於需要獲取的數量,則可以跳出循環。
- 結束循環後需要判斷是否有被臨時移除的trigger,有的話,需要將triggerWrapper放回timeTriggers。
releaseAcquiredTrigger方法
在JobStore中調用triggerWrapper相關的方法時(如triggersFired方法),如果發生了異常,起會調用該方法,將其重新放入timeTriggers中。
releaseAcquiredTrigger方法邏輯:
- 先獲取到lock鎖。
- 判斷triggerWrapper的狀態是否為已獲取狀態。
- 如果是已獲取狀態,則將其重新加入到timeTriggers中。
triggersFired方法
- 先獲取到lock鎖。
- 循環進行處理firedTriggers集合,每次從firedTriggers集合獲取triggerWrapper。
- 判斷triggerWrapper的狀態是否為ACQUIRED狀態,不是的話則continue。
- 調用trigger對應的triggered方法,更新trigger內部的屬性。
a. 計算下一次執行時間
b. 已執行的次數
c. 存儲上一次執行時間 - 設置triggerWrapper的狀態為WAITING(之前是ACQUIRED狀態)
- 判斷對應的job是否允許並發執行
a. 如果不允許的話,則將該job對應的所有triggerWrapper改為加鎖狀態(BLOCKED)
b. 如果允許的話,判斷下一次執行時間是否為null,不為null的話,重新加入timeTriggers中。 - 返回觸發器觸發執行結果(TriggerFiredResult)。
triggeredJobComplete方法
正常情況下,Job被執行完畢的時候,會通知JobStore執行該方法。
- 先獲取到lock鎖
- 根據isPersistJobDataAfterExecution判斷需要是否持久化存儲jobDataMap
- 根據isConcurrentExectionDisallowed判斷job是否不允許並發執行
a. 如果不允許並發執行,需要從blockJobs中移除該job,解除對應trigger的鎖定狀態。如果解除狀態後的trigger狀態為WAITING狀態,則將trigger放入到timeTriggers結合中。 - 根據完成策略CompletedExecutionInstruction對triggerWrapper進行處理
a. 如果CompletedExecutionInstruction默認值是NOOP,什麼都不做。
b. 如果值是DELETE_TRIGGER,則需要刪除對應的trigger。
c. …