Java執行緒的基本使用
- 2019 年 12 月 24 日
- 筆記
首先,這篇文章寫的都是一些比較基礎的內容,也就是從API層面解釋一下我們平時用的比較多的東西,其實我倒是也想寫點底層的東西,可是我也不懂啊。雖然比較基礎,但可能卻比較容易忽略吧
在Java中使用多執行緒,本質上還是對Thread對象的操作。執行緒池只是為了方便對執行緒的管理,避免頻繁的創建和銷毀執行緒帶來不必要的系統開銷,內部通過指定的執行緒數和阻塞隊列實現。
基本使用
創建一個Thread對象的時候一般會傳遞一個Runnable對象,任務邏輯就寫在Runnable的run方法中。感覺這個Runnable的名字取得不太好,如果叫Task是不是會更好一些呢?
new Thread(()-> doXX() ).start();
獲取返回值
上面的那種方式使用起來是挺簡單,但會遇到一些問題,比如:能獲取返回值不?
通過全局變數
像上面這樣是沒辦法獲取返回值的,所以我們需要做一些處理,比如,將結果賦值給一個全局變數
private static int result; public static void main(String[] args) throws InterruptedException { new Thread(() -> { System.out.println("處理業務邏輯"); result = 1000; }).start(); Thread.sleep(1000); System.out.println(result); }
result
就是一個全局變數,當任務執行完成之後,更新這個值。這其實都不能算是返回值,但有時候也能用:不需要立即知道任務的執行結果,在訪問全部變數的時候,只需要獲取它的值就好了。比如通過定時任務去更新快取,不需要關注任務什麼時候執行完成,我需要的只是快取的值,任務執行了就獲取最新的值,沒有執行就獲取舊值。
通過空輪詢
那假如我就是想現在獲取返回值咋辦?因為我要用這個返回值作為下面邏輯的輸入。那或許可以通過輪詢的方式檢測全局變數來達到目的?
while(result == null){ }
除了白白浪費CPU,好像也行啊?但我現在考慮的只是兩個執行緒,如果有多個執行緒該對全局變數修改該怎麼辦呢?那用ThreadLocal?算了,就此打住吧
通過簡單封裝
或許可以封裝一下?再封裝之前,先考慮幾個問題
- 任務的邏輯定義在哪裡? 如果用Runnable,就無法返回值,所以可以定義一個有返回值的@FunctionalInterface介面,叫
Task
- 返回的值存到哪裡?怎麼返回?Thread沒有相關的方法,擴展一下?
public static void main(String[] args) throws InterruptedException { CallableThread callableThread = new CallableThread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "ccccc"; }); callableThread.start(); System.out.println("開始時間 " + LocalDateTime.now()); System.out.println(callableThread.get()); System.out.println("結束時間 " + LocalDateTime.now()); } class CallableThread<T> extends Thread { private Task<T> task; private T result; private volatile boolean finished = false; public CallableThread(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } } @FunctionalInterface interface Task<T> { T call(); }
這樣貌似也可以,但是不太好。Thread本來只是用於處理和執行緒相關的事情,現在將它和邏輯(Task)綁定在一起,如果有多個任務想共用一個Thread,那返回值怎麼處理?
是否可以將這部分邏輯抽出來,放到一個新類當中?
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模擬耗時的業務操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是結果"; }); System.out.println("開始時間 " + LocalDateTime.now()); new Thread(myRunnable).start(); System.out.println("result: " + myRunnable.get()); System.out.println("結束時間 " + LocalDateTime.now()); } class MyRunnable<T> implements Runnable { private Task<T> task; private T result; private volatile boolean finished = false; public MyRunnable(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } }
這不是和java裡面的Future有點像嗎?確實有點像
Future模式
Future
裡面有幾個比較核心的概念
- Future:抽象出
獲取任務返回值
、獲取任務執行狀態
等常用方法的介面 - Callable:類似於上面的 Task
- FutureTask:類似於上面的 MyRunnable
下面看一個例子
public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> future = new FutureTask<>(() -> { Thread.sleep(3000); System.out.println(System.currentTimeMillis()); return "hehehh"; }); new Thread(future).start(); System.out.println("Start Get Result : " + System.currentTimeMillis()); System.out.println("Get Result : " + future.get() + System.currentTimeMillis()); }
Future
Future
介面除了提供獲取返回值的介面,還提供了一些其他的介面,根據名字大概也可以猜到什麼意思,不過多解釋了。實在不行看看源碼吧,這樣子就很愉快了。
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
FutureTask
FutureTask
同時實現了Runnable
和Future
介面,
任務狀態
在FutureTask
中,任務的不同狀態通過state
變數來表示,狀態有以下幾種:
/* * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
任務執行
因為FutureTask
本身也實現了 Runnable 介面,所以核心關注它的run方法,執行邏輯其實比較簡單:
- 先判斷狀態,如果不為
NEW
或者通過cas更新runner
失敗,則直接返回 - 執行
Callable#call
方法,根據執行結果,設置狀態, 如果執行成功:先將state設置成COMPLETING
,然後保存返回的結果保存到屬性outcome
,再將state設置成NORMAL
,最後通過LockSupport.unpark(t)
解除阻塞的執行緒; 如果執行失敗:先將state設置成COMPLETING
,然後異常資訊保存到屬性outcome
,再將state設置成EXCEPTIONAL
,最後通過LockSupport.unpark(t)
解除阻塞的執行緒;
如何阻塞
當我們通過FutureTask#get
方法獲取返回值的時候,會阻塞當前執行緒,那是通過什麼方式阻塞當前執行緒的?是通過LockSupport
阻塞的,這個推薦看看部落格吧。我也是看部落格的,自己也解釋的沒人家好,嗯,就是這樣的
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // state > COMPLETING ,說明任務要麼正常執行,要麼異常結束,所以這裡可以直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; // 這應該是help GC吧? return s; } // 如果正在收尾階段,交出CPU, 等下次循環 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); // 通過UNSAFE 設置 waiters else if (!queued) // 將新的`WaitNode`添加到單向鏈表的頭部,waiters即對應頭節點 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 阻塞當前執行緒 } }
上面我們看到了有一個waiters
,這是用來幹嘛的呢?它是一個單向鏈表結構,主要是為了處理多次調用FutureTask#get
的情況,每調用一次FutureTask#get
就會生成一個WaitNode
節點,然後將它添加到單向鏈表的頭部
那什麼時候用到這個鏈表呢?在任務執行完成的時候,會執行finishCompletion
方法,主要就是從頭節點依次往下遍歷,獲取節點的thread
屬性,然後執行LockSupport.unpark(thread)
解除阻塞
回調如何處理
相對之前的那種方式來說,FutureTask
已經很好用了,直接通過FutureTask#get
方法就可以獲取返回值了,確實蠻方便的。
不過方便是方便,但假如我想在獲取返回值之後執行一些其他的邏輯該怎麼處理呢?其實我最直接的想法就是回調了。比如,我們可以對上面的MyRunnable
程式碼再擴展一下,例如
public MyRunnable addListener(Consumer c) { // 這裡是一個例子,肯定不會每次都new一個執行緒,一般是使用執行緒池 while (!finished) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } c.accept(result); }).start(); return this; }
我們給MyRunnable
添加了一個addListener
方法,接收一個Consumer
作為入參,當任務執行完成之後就執行這段邏輯,如下:
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模擬耗時的業務操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是結果"; }); System.out.println("開始時間 " + LocalDateTime.now()); new Thread(myRunnable).start(); myRunnable.addListener(result -> { System.out.println("當xxx執行完成之後,執行緒:" + Thread.currentThread().getName() + " 執行一些其他的任務"); result = result + " ggggg"; System.out.println(result); }); }
ListenableFuture
ListenableFuture
是guava
包裡面的,對Future
進行了增強,ListenableFuture
繼承了Future
,新增了一個添加回調的方法
/** * @param listener the listener to run when the computation is complete 回調邏輯 * @param executor the executor to run the listener in 回調在哪個執行緒池執行 */ void addListener(Runnable listener, Executor executor);
ListenableFutureTask
繼承了FutureTask
並且是實現了ListenableFuture
介面,看一個簡單例子
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("執行任務開始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("執行任務完成 " + LocalDateTime.now()); return "結果"; }); futureTask.addListener(() -> System.out.println("獲取結果之後,輸出一條日誌"), MoreExecutors.directExecutor()); new Thread(futureTask).start(); }
源碼分析
原理就是將所有回調維護在一個單向鏈表中,也就是ExecutionList
,然後通過重寫“FutureTask#done`方法,在任務完成之後執行回調邏輯
// 每個回調就相當於是一個RunnableExecutorPair節點,所有RunnableExecutorPair節點構成一條鏈表,頭插鏈表 private final ExecutionList executionList = new ExecutionList(); // ListenableFutureTask#addListener public void addListener(Runnable listener, Executor exec) { executionList.add(listener, exec); } // ExecutionList#add public void add(Runnable runnable, Executor executor) { // 上鎖,因為它的內部屬性 executed 可能會被任務邏輯執行緒更新,即 ListenableFutureTask 實現了 FutureTask 的done方法,然後會在裡面更新 executed 的值為true // 還有一點,如果不加鎖,當多個執行緒同時添加回調的時候,可能會造成節點丟失 synchronized (this) { // 如果任務還沒有執行完成,就將當前節點添加到頭節點 if (!executed) { runnables = new RunnableExecutorPair(runnable, executor, runnables); return; } } // 如果任務執行完成,就開始執行回調 executeListener(runnable, executor); } // ExecutionList#executeListener private static void executeListener(Runnable runnable, Executor executor) { try { // 直接將任務交給執行緒池 executor.execute(runnable); } catch (RuntimeException e) { log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } } // ExecutionList.RunnableExecutorPair private static final class RunnableExecutorPair { final Runnable runnable; final Executor executor; @Nullable RunnableExecutorPair next; RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { this.runnable = runnable; this.executor = executor; this.next = next; } }
ListenableFutureTask
是怎麼知道任務是否執行完成了呢? 在FutureTask#finishCompletion
方法中,解除阻塞的執行緒之後,還會執行一個done
方法,不過該方法在FutureTask
沒有任何邏輯,可以把它當作是一個模板方法,而ListenableFutureTask
實現了該方法,如下:
// ListenableFutureTask#done protected void done() { executionList.execute(); } // ExecutionList#execute public void execute() { RunnableExecutorPair list; synchronized (this) { if (executed) { return; } // 首先將executed置為true executed = true; // runnables代表鏈表的頭節點 list = runnables; runnables = null; // allow GC to free listeners even if this stays around for a while. } RunnableExecutorPair reversedList = null; // 這其實是一個倒置的過程,因為我們添加節點的時候,是插入到頭部的,為了保證回調按照我們添加時的順序執行,即 先添加先執行,所以做了一個倒置 while (list != null) { RunnableExecutorPair tmp = list; list = list.next; tmp.next = reversedList; reversedList = tmp; } // 遍歷鏈表,依次執行回調邏輯 while (reversedList != null) { executeListener(reversedList.runnable, reversedList.executor); reversedList = reversedList.next; } }
FutureCallback
通過ListenableFutureTask
,我們可以在任務執行完成之後執行一些回調邏輯。可是細心的同學會發現,回調方法無法使用任務的返回值
,那假如我就是想先獲取值然後再用這個返回值做下一步操作怎麼辦?還是只能先通過get方法阻塞當前執行緒嗎?其實guava
包中也給了我們相關的介面。先看一個例子:
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("執行任務開始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("執行任務完成 " + LocalDateTime.now()); return "結果"; }); Futures.addCallback(futureTask, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("執行成功: " + result); } @Override public void onFailure(Throwable t) { System.out.println("執行失敗"); } }); new Thread(futureTask).start(); }
源碼分析
FutureCallback
介面裡面有兩個方法,分別對應任務執行成功邏輯和任務失敗邏輯
void onSuccess(@Nullable V result); void onFailure(Throwable t);
Futures
可以堪稱是一個門面類,裡面封裝了一些操作
// Futures#addCallback public static <V> void addCallback( ListenableFuture<V> future, FutureCallback<? super V> callback) { // 這裡使用了DirectExecutor執行緒池,即直接在當前執行緒執行 addCallback(future, callback, directExecutor()); } // Futures#addCallback public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) { Runnable callbackListener = new Runnable() { @Override public void run() { final V value; try { value = getDone(future); } catch (ExecutionException e) { callback.onFailure(e.getCause()); return; } catch (RuntimeException e) { callback.onFailure(e); return; } catch (Error e) { callback.onFailure(e); return; } callback.onSuccess(value); } }; // 最終還是將這部分邏輯封裝成一個回調,然後在這個回調中獲取返回值,根據返回值的結果執行相應的FutureCallback方法 future.addListener(callbackListener, executor); } // Futures#getDone public static <V> V getDone(Future<V> future) throws ExecutionException { checkState(future.isDone(), "Future was expected to be done: %s", future); return getUninterruptibly(future); } public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
本質上其實就是將獲取返回值的邏輯封裝成一個回調,在這個回調中獲取返回值,根據返回值的結果執行相應的FutureCallback
方法,不過在使用上卻方便了好多。
與我們直接通過get方法獲取返回值然後再執行其他邏輯還是有區別的,因為我們直接調用Future#get
方法會阻塞當前執行緒,而guava
是在回調中執行這部邏輯,類似於一種通知機制,所以不會阻塞當前執行緒。
ListenableFutureTask
其實Spring裡面也有一個ListenableFutureTask
,實現上和guava
大同小異,也是繼承了FutureTask
並且實現了自己的ListenableFuture
介面,通過重寫FutureTask#done
方法,在該方法中獲取返回值然後執行回調邏輯
public static void main(String[] args) { ListenableFutureTask future = new ListenableFutureTask(() -> "結果"); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { System.out.println("callback " + result); } @Override public void onFailure(Throwable ex) { System.out.println("執行失敗 "); } }); new Thread(future).start(); }
核心源碼
它的Callback是保存在兩個Queue裡面的:successCallbacks
,failureCallbacks
,數據結構是LinkedList
private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();
重寫的done方法如下,邏輯很簡單,就不解釋了
protected void done() { Throwable cause; try { T result = get(); this.callbacks.success(result); return; }catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; }catch (ExecutionException ex) { cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; } this.callbacks.failure(cause); }
CompletableFuture
可能是之前的Future
功能太少了,所以Java8推出了CompletableFuture
,功能強大,除了上面說的那些功能,還有很多其他的功能,反正就是吊炸天。而且從DUBBO 2.7
開始非同步處理都是通過CompletableFuture
來實現。
CompletableFuture ForkJoinPoll
總結
總結下來就發現,那些很好用的API,真的是封裝的好啊。所以,設計模式真的很重要啊,老鐵。。。。。。