死磕 java執行緒系列之執行緒池深入解析——定時任務執行流程

  • 2019 年 11 月 5 日
  • 筆記

threadpool_scheduletask

(手機橫屏看源碼更方便)


註:java源碼分析部分如無特殊說明均基於 java8 版本。

註:本文基於ScheduledThreadPoolExecutor定時執行緒池類。

簡介

前面我們一起學習了普通任務、未來任務的執行流程,今天我們再來學習一種新的任務——定時任務。

定時任務是我們經常會用到的一種任務,它表示在未來某個時刻執行,或者未來按照某種規則重複執行的任務。

問題

(1)如何保證任務是在未來某個時刻才被執行?

(2)如何保證任務按照某種規則重複執行?

來個栗子

創建一個定時執行緒池,用它來跑四種不同的定時任務。

public class ThreadPoolTest03 {      public static void main(String[] args) throws ExecutionException, InterruptedException {          // 創建一個定時執行緒池          ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);            System.out.println("start: " + System.currentTimeMillis());            // 執行一個無返回值任務,5秒後執行,只執行一次          scheduledThreadPoolExecutor.schedule(() -> {              System.out.println("spring: " + System.currentTimeMillis());          }, 5, TimeUnit.SECONDS);            // 執行一個有返回值任務,5秒後執行,只執行一次          ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {              System.out.println("inner summer: " + System.currentTimeMillis());              return "outer summer: ";          }, 5, TimeUnit.SECONDS);          // 獲取返回值          System.out.println(future.get() + System.currentTimeMillis());            // 按固定頻率執行一個任務,每2秒執行一次,1秒後執行          // 任務開始時的2秒後          scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {              System.out.println("autumn: " + System.currentTimeMillis());              LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));          }, 1, 2, TimeUnit.SECONDS);            // 按固定延時執行一個任務,每延時2秒執行一次,1秒執行          // 任務結束時的2秒後,本文由公從號「彤哥讀源碼」原創          scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {              System.out.println("winter: " + System.currentTimeMillis());              LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));          }, 1, 2, TimeUnit.SECONDS);      }  }

定時任務總體分為四種:

(1)未來執行一次的任務,無返回值;

(2)未來執行一次的任務,有返回值;

(3)未來按固定頻率重複執行的任務;

(4)未來按固定延時重複執行的任務;

本文主要以第三種為例進行源碼解析。

scheduleAtFixedRate()方法

提交一個按固定頻率執行的任務。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                long initialDelay,                                                long period,                                                TimeUnit unit) {      // 參數判斷      if (command == null || unit == null)          throw new NullPointerException();      if (period <= 0)          throw new IllegalArgumentException();        // 將普通任務裝飾成ScheduledFutureTask      ScheduledFutureTask<Void> sft =          new ScheduledFutureTask<Void>(command,                                        null,                                        triggerTime(initialDelay, unit),                                        unit.toNanos(period));      // 鉤子方法,給子類用來替換裝飾task,這裡認為t==sft      RunnableScheduledFuture<Void> t = decorateTask(command, sft);      sft.outerTask = t;      // 延時執行      delayedExecute(t);      return t;  }

可以看到,這裡的處理跟未來任務類似,都是裝飾成另一個任務,再拿去執行,不同的是這裡交給了delayedExecute()方法去執行,這個方法是幹嘛的呢?

delayedExecute()方法

延時執行。

private void delayedExecute(RunnableScheduledFuture<?> task) {      // 如果執行緒池關閉了,執行拒絕策略      if (isShutdown())          reject(task);      else {          // 先把任務扔到隊列中去          super.getQueue().add(task);          // 再次檢查執行緒池狀態          if (isShutdown() &&              !canRunInCurrentRunState(task.isPeriodic()) &&              remove(task))              task.cancel(false);          else              // 保證有足夠有執行緒執行任務              ensurePrestart();      }  }  void ensurePrestart() {      int wc = workerCountOf(ctl.get());      // 創建工作執行緒      // 注意,這裡沒有傳入firstTask參數,因為上面先把任務扔到隊列中去了      // 另外,沒用上maxPoolSize參數,所以最大執行緒數量在定時執行緒池中實際是沒有用的      if (wc < corePoolSize)          addWorker(null, true);      else if (wc == 0)          addWorker(null, false);  }

到這裡就結束了?!

實際上,這裡只是控制任務能不能被執行,真正執行任務的地方在任務的run()方法中。

還記得上面的任務被裝飾成了ScheduledFutureTask類的實例嗎?所以,我們只要看ScheduledFutureTask的run()方法就可以了。

ScheduledFutureTask類的run()方法

定時任務執行的地方。

public void run() {      // 是否重複執行      boolean periodic = isPeriodic();      // 執行緒池狀態判斷      if (!canRunInCurrentRunState(periodic))          cancel(false);      // 一次性任務,直接調用父類的run()方法,這個父類實際上是FutureTask      // 這裡我們不再講解,有興趣的同學看看上一章的內容      else if (!periodic)          ScheduledFutureTask.super.run();      // 重複性任務,先調用父類的runAndReset()方法,這個父類也是FutureTask      // 本文主要分析下面的部分      else if (ScheduledFutureTask.super.runAndReset()) {          // 設置下次執行的時間          setNextRunTime();          // 重複執行,本文由公從號「彤哥讀源碼」原創          reExecutePeriodic(outerTask);      }  }

可以看到,對於重複性任務,先調用FutureTask的runAndReset()方法,再設置下次執行的時間,最後再調用reExecutePeriodic()方法。

FutureTask的runAndReset()方法與run()方法類似,只是其任務運行完畢後不會把狀態修改為NORMAL,有興趣的同學點進源碼看看。

再來看看reExecutePeriodic()方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {      // 執行緒池狀態檢查      if (canRunInCurrentRunState(true)) {          // 再次把任務扔到任務隊列中          super.getQueue().add(task);          // 再次檢查執行緒池狀態          if (!canRunInCurrentRunState(true) && remove(task))              task.cancel(false);          else              // 保證工作執行緒足夠              ensurePrestart();      }  }

到這裡是不是豁然開朗了,原來定時執行緒池執行重複任務是在任務執行完畢後,又把任務扔回了任務隊列中。

重複性的問題解決了,那麼,它是怎麼控制任務在某個時刻執行的呢?

OK,這就輪到我們的延時隊列登場了。

DelayedWorkQueue內部類

我們知道,執行緒池執行任務時需要從任務隊列中拿任務,而普通的任務隊列,如果裡面有任務就直接拿出來了,但是延時隊列不一樣,它裡面的任務,如果沒有到時間也是拿不出來的,這也是前面分析中一上來就把任務扔進隊列且創建Worker沒有傳入firstTask的原因。

說了這麼多,它到底是怎麼實現的呢?

其實,延時隊列我們在前面都詳細分析過,想看完整源碼分析的可以看看之前的《死磕 java集合之DelayQueue源碼分析》。

延時隊列內部是使用「堆」這種數據結構來實現的,有興趣的同學可以看看之前的《拜託,面試別再問我堆(排序)了!》。

我們這裡只拿一個take()方法出來分析。

public RunnableScheduledFuture<?> take() throws InterruptedException {      final ReentrantLock lock = this.lock;      // 加鎖      lock.lockInterruptibly();      try {          for (;;) {              // 堆頂任務              RunnableScheduledFuture<?> first = queue[0];              // 如果隊列為空,則等待              if (first == null)                  available.await();              else {                  // 還有多久到時間                  long delay = first.getDelay(NANOSECONDS);                  // 如果小於等於0,說明這個任務到時間了,可以從隊列中出隊了                  if (delay <= 0)                      // 出隊,然後堆化                      return finishPoll(first);                  // 還沒到時間                  first = null;                  // 如果前面有執行緒在等待,直接進入等待                  if (leader != null)                      available.await();                  else {                      // 當前執行緒作為leader                      Thread thisThread = Thread.currentThread();                      leader = thisThread;                      try {                          // 等待上面計算的延時時間,再自動喚醒                          available.awaitNanos(delay);                      } finally {                          // 喚醒後再次獲得鎖後把leader再置空                          if (leader == thisThread)                              leader = null;                      }                  }              }          }      } finally {          if (leader == null && queue[0] != null)              // 相當於喚醒下一個等待的任務              available.signal();          // 解鎖,本文由公從號「彤哥讀源碼」原創          lock.unlock();      }  }

大致的原理是,利用堆的特性獲取最快到時間的任務,即堆頂的任務:

(1)如果堆頂的任務到時間了,就讓它從隊列中了隊;

(2)如果堆頂的任務還沒到時間,就看它還有多久到時間,利用條件鎖等待這段時間,待時間到了後重新走(1)的判斷;

這樣就解決了可以在指定時間後執行任務。

其它

其實,ScheduledThreadPoolExecutor也是可以使用execute()或者submit()提交任務的,只不過它們會被當成0延時的任務來執行一次。

public void execute(Runnable command) {      schedule(command, 0, NANOSECONDS);  }  public <T> Future<T> submit(Callable<T> task) {      return schedule(task, 0, NANOSECONDS);  }

總結

實現定時任務有兩個問題要解決,分別是指定未來某個時刻執行任務、重複執行。

(1)指定某個時刻執行任務,是通過延時隊列的特性來解決的;

(2)重複執行,是通過在任務執行後再次把任務加入到隊列中來解決的。

彩蛋

到這裡基本上普通的執行緒池的源碼解析就結束了,這種執行緒池是比較經典的實現方式,整體上來說,效率相對不是特別高,因為所有的工作執行緒共用同一個隊列,每次從隊列中取任務都要加鎖解鎖操作。

那麼,能不能給每個工作執行緒配備一個任務隊列呢,在提交任務的時候就把任務分配給指定的工作執行緒,這樣在取任務的時候就不需要頻繁的加鎖解鎖了。

答案是肯定的,下一章我們一起來看看這種基於「工作竊取」理論的執行緒池——ForkJoinPool。


歡迎關注我的公眾號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一起暢遊源碼的海洋。

qrcode