周期性執行緒池與主要源碼解析

  • 2019 年 10 月 3 日
  • 筆記

之前學習ThreadPool的使用以及源碼剖析,並且從面試的角度去介紹知識點的解答。今天小強帶來周期性執行緒池的使用和重點源碼剖析。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor:用來處理延時任務或定時任務
定時執行緒池類的類結構圖
定時執行緒池類的類結構圖

ScheduledThreadPoolExecutor接收ScheduleFutureTask類型的任務,是執行緒池調度任務的最小單位。
它採用DelayQueue存儲等待的任務:
1、DelayQueue內部封裝成一個PriorityQueue,它會根據time的先後時間順序,如果time相同則根絕sequenceNumber排序;
2、DelayQueue是無界隊列;

周期性執行緒池任務執行簡圖

ScheduleFutureTask

接收的參數:

private final long sequenceNumber;//任務的序號  private long time;//任務開始的時間  private final long period;//任務執行的時間間隔

工作執行緒的的執行過程:
工作執行緒會從DelayQueue取出已經到期的任務去執行;
執行結束後重新設置任務的到期時間,再次放回DelayQueue;

ScheduledThreadPoolExecutor會把待執行的任務放到工作隊列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的ScheduledFutureTask進行排序,具體的排序演算法實現如下:

public int compareTo(Delayed other) {      if (other == this) // compare zero if same object          return 0;      if (other instanceof ScheduledFutureTask) {          ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;          //首先按照time排序,time小的排到前面,time大的排到後面          long diff = time - x.time;          if (diff < 0)              return -1;          else if (diff > 0)              return 1;          //time相同,按照sequenceNumber排序;          //sequenceNumber小的排在前面,sequenceNumber大的排在後面          else if (sequenceNumber < x.sequenceNumber)              return -1;          else              return 1;      }      long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;  }

接下來看看ScheduledFutureTask的run方法實現, run方法是調度task的核心,task的執行實際上是run方法的執行。

public void run() {      //是否是周期性的      boolean periodic = isPeriodic();      //執行緒池是shundown狀態不支援處理新任務,直接取消任務      if (!canRunInCurrentRunState(periodic))          cancel(false);      //如果不需要執行執行周期性任務,直接執行run方法結束      else if (!periodic)          ScheduledFutureTask.super.run();      //如果需要周期性執行,則在執行任務完成後,設置下一次執行時間      else if (ScheduledFutureTask.super.runAndReset()) {          //設置下一次執行該任務的時間          setNextRunTime();          //重複執行該任務          reExecutePeriodic(outerTask);      }  }

run方法的執行步驟:

  • 1、如果執行緒池是shundown狀態不支援處理新任務,直接取消任務,否則步驟2;
  • 2、如果不是周期性任務,直接調用ScheduledFutureTask的run方法執行,會設置執行結果,然後直接返回,否則步驟3;
  • 3、如果是周期性任務,調用ScheduledFutureTask的runAndset方法執行,不會設置執行結果,然後直接返回,否則執行步驟4和步驟5;
  • 4、計算下一次執行該任務的時間;
  • 5、重複執行該任務;

接下來看下reExecutePeriodic方法的執行步驟:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {      if (canRunInCurrentRunState(true)) {          super.getQueue().add(task);          if (!canRunInCurrentRunState(true) && remove(task))              task.cancel(false);          else              ensurePrestart();      }  }

由於已經執行過一次周期性任務,所以不會reject當前任務,同時傳入的任務一定是周期性任務。

周期性執行緒池任務的提交方式

周期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和源碼兩個方面進行說明,首先是如果提交任務:

pool.schedule(new Runnable() {      @Override      public void run() {          System.out.println("延遲執行");      }  },1, TimeUnit.SECONDS);    /**   * 這個執行周期是固定,不管任務執行多長時間,每過3秒中就會產生一個新的任務   */  pool.scheduleAtFixedRate(new Runnable() {      @Override      public void run() {          //這個業務邏輯需要很長的時間,超過了3秒          System.out.println("重複執行");      }  },1,3,TimeUnit.SECONDS);    pool.shutdown();    /**   * 假如run方法30min後執行完成,然後間隔3秒,再周期性執行下一個任務   */  pool.scheduleWithFixedDelay(new Runnable() {      @Override      public void run() {          //30min          System.out.println("重複執行");      }  },1,3,TimeUnit.SECONDS);

知道了如何提交周期性任務,接下來源碼是如何執行的,首先是schedule方法,該方法是指任務在指定延遲時間到達後觸發,只會執行一次。

public ScheduledFuture<?> schedule(Runnable command,                                     long delay,                                     TimeUnit unit) {      if (command == null || unit == null)          throw new NullPointerException();      //把任務封裝成ScheduledFutureTask,之後調用decorateTask進行包裝;      //decorateTask方法是空方法,留給用戶去實現的;      RunnableScheduledFuture<?> t = decorateTask(command,          new ScheduledFutureTask<Void>(command, null,                                        triggerTime(delay, unit)));      //包裝好任務之後,進行任務的提交      delayedExecute(t);      return t;  }

任務提交方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {      //如果執行緒池不是RUNNING狀態,則使用拒絕策略把提交任務拒絕掉      if (isShutdown())          reject(task);      else {          //與ThreadPoolExecutor不同,這裡直接把任務加入延遲隊列          super.getQueue().add(task);          //如果當前狀態無法執行任務,則取消          if (isShutdown() &&              !canRunInCurrentRunState(task.isPeriodic()) &&              remove(task))              task.cancel(false);          else          //和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;          //增加Worker,確保提交的任務能夠被執行              ensurePrestart();      }  }

還沒關注我的公眾號?

  • 掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
  • 學習資料: 1T影片教程:涵蓋Javaweb前後端教學影片、機器學習/人工智慧教學影片、Linux系統教程影片、雅思考試影片教程;
  • 100多本書:包含C/C++、Java、Python三門程式語言的經典必看圖書、LeetCode題解大全;
  • 軟體工具:幾乎包括你在編程道路上的可能會用到的大部分軟體;
  • 項目源碼:20個JavaWeb項目源碼。
    小強的進階之路二維碼