Java執行緒的基本使用

  • 2019 年 12 月 24 日
  • 筆記

首先,這篇文章寫的都是一些比較基礎的內容,也就是從API層面解釋一下我們平時用的比較多的東西,其實我倒是也想寫點底層的東西,可是我也不懂啊。雖然比較基礎,但可能卻比較容易忽略吧

在Java中使用多執行緒,本質上還是對Thread對象的操作。執行緒池只是為了方便對執行緒的管理,避免頻繁的創建和銷毀執行緒帶來不必要的系統開銷,內部通過指定的執行緒數和阻塞隊列實現。

基本使用

創建一個Thread對象的時候一般會傳遞一個Runnable對象,任務邏輯就寫在Runnable的run方法中。感覺這個Runnable的名字取得不太好,如果叫Task是不是會更好一些呢?

new Thread(()-> doXX() ).start();

獲取返回值

上面的那種方式使用起來是挺簡單,但會遇到一些問題,比如:能獲取返回值不?

通過全局變數

像上面這樣是沒辦法獲取返回值的,所以我們需要做一些處理,比如,將結果賦值給一個全局變數

private static int result;    public static void main(String[] args) throws InterruptedException {      new Thread(() -> {          System.out.println("處理業務邏輯");          result = 1000;      }).start();      Thread.sleep(1000);      System.out.println(result);  }

result就是一個全局變數,當任務執行完成之後,更新這個值。這其實都不能算是返回值,但有時候也能用:不需要立即知道任務的執行結果,在訪問全部變數的時候,只需要獲取它的值就好了。比如通過定時任務去更新快取,不需要關注任務什麼時候執行完成,我需要的只是快取的值,任務執行了就獲取最新的值,沒有執行就獲取舊值。

通過空輪詢

那假如我就是想現在獲取返回值咋辦?因為我要用這個返回值作為下面邏輯的輸入。那或許可以通過輪詢的方式檢測全局變數來達到目的?

while(result == null){  }

除了白白浪費CPU,好像也行啊?但我現在考慮的只是兩個執行緒,如果有多個執行緒該對全局變數修改該怎麼辦呢?那用ThreadLocal?算了,就此打住吧

通過簡單封裝

或許可以封裝一下?再封裝之前,先考慮幾個問題

  1. 任務的邏輯定義在哪裡? 如果用Runnable,就無法返回值,所以可以定義一個有返回值的@FunctionalInterface介面,叫 Task
  2. 返回的值存到哪裡?怎麼返回?Thread沒有相關的方法,擴展一下?
public static void main(String[] args) throws InterruptedException {      CallableThread callableThread = new CallableThread(() -> {          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "ccccc";      });        callableThread.start();      System.out.println("開始時間 " + LocalDateTime.now());      System.out.println(callableThread.get());      System.out.println("結束時間 " + LocalDateTime.now());  }      class CallableThread<T> extends Thread {      private Task<T> task;        private T result;        private volatile boolean finished = false;        public CallableThread(Task<T> task) {          this.task = task;      }        @Override      public void run() {          synchronized (this) {              result = task.call();              finished = true;              notifyAll();          }      }        public T get() {          synchronized (this) {              while (!finished) {                  try {                      wait();                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }              return result;          }      }  }    @FunctionalInterface  interface Task<T> {      T call();  }

這樣貌似也可以,但是不太好。Thread本來只是用於處理和執行緒相關的事情,現在將它和邏輯(Task)綁定在一起,如果有多個任務想共用一個Thread,那返回值怎麼處理?

是否可以將這部分邏輯抽出來,放到一個新類當中?

public static void main(String[] args) throws InterruptedException {      MyRunnable<String> myRunnable = new MyRunnable(() -> {          // 模擬耗時的業務操作          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "我是結果";      });      System.out.println("開始時間 " + LocalDateTime.now());      new Thread(myRunnable).start();        System.out.println("result: " + myRunnable.get());      System.out.println("結束時間 " + LocalDateTime.now());  }      class MyRunnable<T> implements Runnable {      private Task<T> task;        private T result;        private volatile boolean finished = false;        public MyRunnable(Task<T> task) {          this.task = task;      }        @Override      public void run() {          synchronized (this) {              result = task.call();              finished = true;              notifyAll();          }      }        public T get() {          synchronized (this) {              while (!finished) {                  try {                      wait();                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }              return result;          }      }  }

這不是和java裡面的Future有點像嗎?確實有點像

Future模式

Future裡面有幾個比較核心的概念

  1. Future:抽象出 獲取任務返回值獲取任務執行狀態 等常用方法的介面
  2. Callable:類似於上面的 Task
  3. FutureTask:類似於上面的 MyRunnable

下面看一個例子

public static void main(String[] args) throws ExecutionException, InterruptedException {      FutureTask<String> future = new FutureTask<>(() -> {          Thread.sleep(3000);          System.out.println(System.currentTimeMillis());          return "hehehh";      });      new Thread(future).start();      System.out.println("Start Get Result : " + System.currentTimeMillis());      System.out.println("Get Result : " + future.get() + System.currentTimeMillis());  }

Future

Future介面除了提供獲取返回值的介面,還提供了一些其他的介面,根據名字大概也可以猜到什麼意思,不過多解釋了。實在不行看看源碼吧,這樣子就很愉快了。

boolean cancel(boolean mayInterruptIfRunning);  boolean isCancelled();  boolean isDone();  V get() throws InterruptedException, ExecutionException;  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

FutureTask

FutureTask同時實現了RunnableFuture介面,

任務狀態

FutureTask中,任務的不同狀態通過state變數來表示,狀態有以下幾種:

/*   * NEW -> COMPLETING -> NORMAL   * NEW -> COMPLETING -> EXCEPTIONAL   * NEW -> CANCELLED   * NEW -> INTERRUPTING -> INTERRUPTED   */    private volatile int state;    private static final int NEW          = 0;  private static final int COMPLETING   = 1;  private static final int NORMAL       = 2;  private static final int EXCEPTIONAL  = 3;  private static final int CANCELLED    = 4;  private static final int INTERRUPTING = 5;  private static final int INTERRUPTED  = 6;
任務執行

因為FutureTask本身也實現了 Runnable 介面,所以核心關注它的run方法,執行邏輯其實比較簡單:

  1. 先判斷狀態,如果不為NEW 或者通過cas更新 runner 失敗,則直接返回
  2. 執行Callable#call方法,根據執行結果,設置狀態, 如果執行成功:先將state設置成COMPLETING,然後保存返回的結果保存到屬性outcome,再將state設置成NORMAL,最後通過LockSupport.unpark(t)解除阻塞的執行緒; 如果執行失敗:先將state設置成COMPLETING,然後異常資訊保存到屬性outcome,再將state設置成EXCEPTIONAL,最後通過LockSupport.unpark(t)解除阻塞的執行緒;
如何阻塞

當我們通過FutureTask#get方法獲取返回值的時候,會阻塞當前執行緒,那是通過什麼方式阻塞當前執行緒的?是通過LockSupport阻塞的,這個推薦看看部落格吧。我也是看部落格的,自己也解釋的沒人家好,嗯,就是這樣的

public V get() throws InterruptedException, ExecutionException {      int s = state;      if (s <= COMPLETING)          s = awaitDone(false, 0L);      return report(s);  }    private int awaitDone(boolean timed, long nanos)      throws InterruptedException {      final long deadline = timed ? System.nanoTime() + nanos : 0L;      WaitNode q = null;      boolean queued = false;      for (;;) {          if (Thread.interrupted()) {              removeWaiter(q);              throw new InterruptedException();          }            int s = state;          // state > COMPLETING ,說明任務要麼正常執行,要麼異常結束,所以這裡可以直接返回          if (s > COMPLETING) {              if (q != null)                  q.thread = null;   // 這應該是help GC吧?              return s;          }          // 如果正在收尾階段,交出CPU, 等下次循環          else if (s == COMPLETING) // cannot time out yet              Thread.yield();          else if (q == null)              q = new WaitNode();          // 通過UNSAFE 設置 waiters          else if (!queued)              // 將新的`WaitNode`添加到單向鏈表的頭部,waiters即對應頭節點              queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                      q.next = waiters, q);          else if (timed) {              nanos = deadline - System.nanoTime();              if (nanos <= 0L) {                  removeWaiter(q);                  return state;              }              LockSupport.parkNanos(this, nanos);          }          else              LockSupport.park(this);   // 阻塞當前執行緒      }  }

上面我們看到了有一個waiters,這是用來幹嘛的呢?它是一個單向鏈表結構,主要是為了處理多次調用FutureTask#get的情況,每調用一次FutureTask#get就會生成一個WaitNode節點,然後將它添加到單向鏈表的頭部

那什麼時候用到這個鏈表呢?在任務執行完成的時候,會執行finishCompletion方法,主要就是從頭節點依次往下遍歷,獲取節點的thread屬性,然後執行LockSupport.unpark(thread)解除阻塞

回調如何處理

相對之前的那種方式來說,FutureTask已經很好用了,直接通過FutureTask#get方法就可以獲取返回值了,確實蠻方便的。

不過方便是方便,但假如我想在獲取返回值之後執行一些其他的邏輯該怎麼處理呢?其實我最直接的想法就是回調了。比如,我們可以對上面的MyRunnable程式碼再擴展一下,例如

public MyRunnable addListener(Consumer c) {      // 這裡是一個例子,肯定不會每次都new一個執行緒,一般是使用執行緒池          while (!finished) {              try {                  Thread.sleep(1);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }          c.accept(result);      }).start();      return this;  }

我們給MyRunnable添加了一個addListener方法,接收一個Consumer作為入參,當任務執行完成之後就執行這段邏輯,如下:

public static void main(String[] args) throws InterruptedException {      MyRunnable<String> myRunnable = new MyRunnable(() -> {          // 模擬耗時的業務操作          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "我是結果";      });      System.out.println("開始時間 " + LocalDateTime.now());      new Thread(myRunnable).start();        myRunnable.addListener(result -> {          System.out.println("當xxx執行完成之後,執行緒:" + Thread.currentThread().getName() + " 執行一些其他的任務");          result = result + "   ggggg";          System.out.println(result);      });  }

ListenableFuture

ListenableFutureguava包裡面的,對Future進行了增強,ListenableFuture繼承了Future,新增了一個添加回調的方法

/**   * @param listener the listener to run when the computation is complete     回調邏輯   * @param executor the executor to run the listener in  回調在哪個執行緒池執行   */  void addListener(Runnable listener, Executor executor);

ListenableFutureTask繼承了FutureTask並且是實現了ListenableFuture介面,看一個簡單例子

public static void main(String[] args) throws InterruptedException {      ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {          System.out.println("執行任務開始  " + LocalDateTime.now());          Thread.sleep(3000);          System.out.println("執行任務完成  " + LocalDateTime.now());          return "結果";      });        futureTask.addListener(() -> System.out.println("獲取結果之後,輸出一條日誌"), MoreExecutors.directExecutor());      new Thread(futureTask).start();  }
源碼分析

原理就是將所有回調維護在一個單向鏈表中,也就是ExecutionList,然後通過重寫“FutureTask#done`方法,在任務完成之後執行回調邏輯

// 每個回調就相當於是一個RunnableExecutorPair節點,所有RunnableExecutorPair節點構成一條鏈表,頭插鏈表  private final ExecutionList executionList = new ExecutionList();    // ListenableFutureTask#addListener  public void addListener(Runnable listener, Executor exec) {      executionList.add(listener, exec);  }      // ExecutionList#add  public void add(Runnable runnable, Executor executor) {      // 上鎖,因為它的內部屬性 executed 可能會被任務邏輯執行緒更新,即 ListenableFutureTask 實現了 FutureTask 的done方法,然後會在裡面更新 executed 的值為true      // 還有一點,如果不加鎖,當多個執行緒同時添加回調的時候,可能會造成節點丟失      synchronized (this) {          // 如果任務還沒有執行完成,就將當前節點添加到頭節點          if (!executed) {              runnables = new RunnableExecutorPair(runnable, executor, runnables);              return;          }      }        // 如果任務執行完成,就開始執行回調      executeListener(runnable, executor);  }      // ExecutionList#executeListener  private static void executeListener(Runnable runnable, Executor executor) {      try {          // 直接將任務交給執行緒池          executor.execute(runnable);      } catch (RuntimeException e) {          log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);      }  }    // ExecutionList.RunnableExecutorPair  private static final class RunnableExecutorPair {      final Runnable runnable;      final Executor executor;      @Nullable RunnableExecutorPair next;        RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {          this.runnable = runnable;          this.executor = executor;          this.next = next;      }  }

ListenableFutureTask是怎麼知道任務是否執行完成了呢?FutureTask#finishCompletion方法中,解除阻塞的執行緒之後,還會執行一個done方法,不過該方法在FutureTask沒有任何邏輯,可以把它當作是一個模板方法,而ListenableFutureTask實現了該方法,如下:

// ListenableFutureTask#done  protected void done() {      executionList.execute();  }      // ExecutionList#execute  public void execute() {      RunnableExecutorPair list;      synchronized (this) {          if (executed) {              return;          }          // 首先將executed置為true          executed = true;          // runnables代表鏈表的頭節點          list = runnables;          runnables = null; // allow GC to free listeners even if this stays around for a while.      }            RunnableExecutorPair reversedList = null;      // 這其實是一個倒置的過程,因為我們添加節點的時候,是插入到頭部的,為了保證回調按照我們添加時的順序執行,即 先添加先執行,所以做了一個倒置      while (list != null) {          RunnableExecutorPair tmp = list;          list = list.next;          tmp.next = reversedList;          reversedList = tmp;      }        // 遍歷鏈表,依次執行回調邏輯      while (reversedList != null) {          executeListener(reversedList.runnable, reversedList.executor);          reversedList = reversedList.next;      }  }

FutureCallback

通過ListenableFutureTask,我們可以在任務執行完成之後執行一些回調邏輯。可是細心的同學會發現,回調方法無法使用任務的返回值,那假如我就是想先獲取值然後再用這個返回值做下一步操作怎麼辦?還是只能先通過get方法阻塞當前執行緒嗎?其實guava包中也給了我們相關的介面。先看一個例子:

public static void main(String[] args) throws InterruptedException {      ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {          System.out.println("執行任務開始  " + LocalDateTime.now());          Thread.sleep(3000);          System.out.println("執行任務完成  " + LocalDateTime.now());          return "結果";      });        Futures.addCallback(futureTask, new FutureCallback<String>() {          @Override          public void onSuccess(String result) {              System.out.println("執行成功: " + result);          }            @Override          public void onFailure(Throwable t) {              System.out.println("執行失敗");          }      });        new Thread(futureTask).start();  }
源碼分析

FutureCallback介面裡面有兩個方法,分別對應任務執行成功邏輯和任務失敗邏輯

void onSuccess(@Nullable V result);    void onFailure(Throwable t);

Futures可以堪稱是一個門面類,裡面封裝了一些操作

// Futures#addCallback  public static <V> void addCallback(      ListenableFuture<V> future, FutureCallback<? super V> callback) {          // 這裡使用了DirectExecutor執行緒池,即直接在當前執行緒執行          addCallback(future, callback, directExecutor());  }    // Futures#addCallback  public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {      Runnable callbackListener =          new Runnable() {              @Override              public void run() {                  final V value;                  try {                      value = getDone(future);                  } catch (ExecutionException e) {                      callback.onFailure(e.getCause());                      return;                  } catch (RuntimeException e) {                      callback.onFailure(e);                      return;                  } catch (Error e) {                      callback.onFailure(e);                      return;                  }                  callback.onSuccess(value);              }          };      // 最終還是將這部分邏輯封裝成一個回調,然後在這個回調中獲取返回值,根據返回值的結果執行相應的FutureCallback方法      future.addListener(callbackListener, executor);  }    // Futures#getDone  public static <V> V getDone(Future<V> future) throws ExecutionException {      checkState(future.isDone(), "Future was expected to be done: %s", future);      return getUninterruptibly(future);  }  public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {      boolean interrupted = false;      try {        while (true) {          try {            return future.get();          } catch (InterruptedException e) {            interrupted = true;          }        }      } finally {        if (interrupted) {          Thread.currentThread().interrupt();        }      }  }

本質上其實就是將獲取返回值的邏輯封裝成一個回調,在這個回調中獲取返回值,根據返回值的結果執行相應的FutureCallback方法,不過在使用上卻方便了好多。

與我們直接通過get方法獲取返回值然後再執行其他邏輯還是有區別的,因為我們直接調用Future#get方法會阻塞當前執行緒,而guava是在回調中執行這部邏輯,類似於一種通知機制,所以不會阻塞當前執行緒。

ListenableFutureTask

其實Spring裡面也有一個ListenableFutureTask,實現上和guava大同小異,也是繼承了FutureTask並且實現了自己的ListenableFuture介面,通過重寫FutureTask#done方法,在該方法中獲取返回值然後執行回調邏輯

public static void main(String[] args) {      ListenableFutureTask future = new ListenableFutureTask(() -> "結果");        future.addCallback(new ListenableFutureCallback() {          @Override          public void onSuccess(Object result) {              System.out.println("callback " + result);          }            @Override          public void onFailure(Throwable ex) {              System.out.println("執行失敗 ");          }      });      new Thread(future).start();  }

核心源碼

它的Callback是保存在兩個Queue裡面的:successCallbacksfailureCallbacks,數據結構是LinkedList

private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();    private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();

重寫的done方法如下,邏輯很簡單,就不解釋了

protected void done() {      Throwable cause;      try {          T result = get();          this.callbacks.success(result);          return;      }catch (InterruptedException ex) {          Thread.currentThread().interrupt();          return;      }catch (ExecutionException ex) {          cause = ex.getCause();          if (cause == null) {              cause = ex;          }      }      catch (Throwable ex) {          cause = ex;      }      this.callbacks.failure(cause);  }

CompletableFuture

可能是之前的Future功能太少了,所以Java8推出了CompletableFuture,功能強大,除了上面說的那些功能,還有很多其他的功能,反正就是吊炸天。而且從DUBBO 2.7開始非同步處理都是通過CompletableFuture來實現。

CompletableFuture ForkJoinPoll

總結

總結下來就發現,那些很好用的API,真的是封裝的好啊。所以,設計模式真的很重要啊,老鐵。。。。。。