60行從零開始自己動手寫FutureTask是什麼體驗?
前言
在並發編程當中我們最常見的需求就是啟動一個線程執行一個函數去完成我們的需求,而在這種需求當中,我們常常需要函數有返回值。比如我們需要同一個非常大的數組當中數據的和,讓每一個線程求某一個區間內部的和,最終將這些和加起來,那麼每個線程都需要返回對應區間的和。而在Java當中給我們提供了這種機制,去實現這一個效果——FutureTask
。
FutureTask
在自己寫FutureTask
之前我們首先寫一個例子來回顧一下FutureTask
的編程步驟:
- 寫一個類實現
Callable
接口。
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
實現接口就實現call
即可,可以看到這個函數是有返回值的,而FutureTask
返回給我們的值就是這個函數的返回值。
new
一個FutureTask
對象,並且new
一個第一步寫的類,new FutureTask<>(callable實現類)
。- 最後將剛剛得到的
FutureTask
對象傳入Thread
類當中,然後啟動線程即可new Thread(futureTask).start();
。 - 然後我們可以調用
FutureTask
的get
方法得到返回的結果futureTask.get();
。
假如有一個數組
data
,長度為100000,現在有10個線程,第i
個線程求數組[i * 10000, (i + 1) * 10000)
所有數據的和,然後將這十個線程的結果加起來。
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] data = new int[100000];
Random random = new Random();
for (int i = 0; i < 100000; i++) {
data[i] = random.nextInt(10000);
}
@SuppressWarnings("unchecked")
FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10);
// 設置10個 futuretask 任務計算數組當中數據的和
for (int i = 0; i < 10; i++) {
int idx = i;
tasks[i] = new FutureTask<>(() -> {
int sum = 0;
for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
sum += data[k];
}
return sum;
});
}
// 開啟線程執行 futureTask 任務
for (FutureTask<Integer> futureTask : tasks) {
new Thread(futureTask).start();
}
int threadSum = 0;
for (FutureTask<Integer> futureTask : tasks) {
threadSum += futureTask.get();
}
int sum = Arrays.stream(data).sum();
System.out.println(sum == threadSum); // 結果始終為 true
}
}
可能你會對FutureTask
的使用方式感覺困惑,或者不是很清楚,現在我們來仔細捋一下思路。
- 首先啟動一個線程要麼是繼承自
Thread
類,然後重寫Thread
類的run
方法,要麼是給Thread
類傳遞一個實現了Runnable
的類對象,當然可以用匿名內部類實現。 - 既然我們的
FutureTask
對象可以傳遞給Thread
類,說明FutureTask
肯定是實現了Runnable
接口,我們現在來看一下FutureTask
的繼承體系。
可以發現的是FutureTask
確實實現了Runnable
接口,同時還實現了Future
接口,這個Future
接口主要提供了後面我們使用FutureTask
的一系列函數比如get
。
- 看到這裡你應該能夠大致想到在
FutureTask
中的run
方法會調用Callable
當中實現的call
方法,然後將結果保存下來,當調用get
方法的時候再將這個結果返回。
自己實現FutureTask
工具準備
經過上文的分析你可能已經大致了解了FutureTask
的大致執行過程了,但是需要注意的是,如果你執行FutureTask
的get
方法是可能阻塞的,因為可能Callable
的call
方法還沒有執行完成。因此在get
方法當中就需要有阻塞線程的代碼,但是當call
方法執行完成之後需要將這些線程都喚醒。
在本篇文章當中使用鎖ReentrantLock
和條件變量Condition
進行線程的阻塞和喚醒,在我們自己動手實現FutureTask
之前,我們先熟悉一下上面兩種工具的使用方法。
ReentrantLock
主要有兩個方法:lock
對臨界區代碼塊進行加鎖。unlock
對臨界區代碼進行解鎖。
Condition
主要有三個方法:await
阻塞調用這個方法的線程,等待其他線程喚醒。signal
喚醒一個被await
方法阻塞的線程。signalAll
喚醒所有被await
方法阻塞的線程。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
private ReentrantLock lock;
private Condition condition;
LockDemo() {
lock = new ReentrantLock();
condition = lock.newCondition();
}
public void blocking() {
lock.lock();
try {
System.out.println(Thread.currentThread() + " 準備等待被其他線程喚醒");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void inform() throws InterruptedException {
// 先休眠兩秒 等他其他線程先阻塞
TimeUnit.SECONDS.sleep(2);
lock.lock();
try {
System.out.println(Thread.currentThread() + " 準備喚醒其他線程");
condition.signal(); // 喚醒一個被 await 方法阻塞的線程
// condition.signalAll(); // 喚醒所有被 await 方法阻塞的線程
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
LockDemo lockDemo = new LockDemo();
Thread thread = new Thread(() -> {
lockDemo.blocking(); // 執行阻塞線程的代碼
}, "Blocking-Thread");
Thread thread1 = new Thread(() -> {
try {
lockDemo.inform(); // 執行喚醒線程的代碼
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Inform-Thread");
thread.start();
thread1.start();
}
}
上面的代碼的輸出:
Thread[Blocking-Thread,5,main] 準備等待被其他線程喚醒
Thread[Inform-Thread,5,main] 準備喚醒其他線程
FutureTask設計與實現
在前文當中我們已經談到了FutureTask
的實現原理,主要有以下幾點:
- 構造函數需要傳入一個實現了
Callable
接口的類對象,這個將會在FutureTask
的run
方法執行,然後得到函數的返回值,並且將返回值存儲起來。 - 當線程調用
get
方法的時候,如果這個時候Callable
當中的call
已經執行完成,直接返回call
函數返回的結果就行,如果call
函數還沒有執行完成,那麼就需要將調用get
方法的線程掛起,這裡我們可以使用condition.await()
將線程掛起。 - 在
call
函數執行完成之後,需要將之前被get
方法掛起的線程喚醒繼續執行,這裡使用condition.signalAll()
將所有掛起的線程喚醒。 - 因為是我們自己實現
FutureTask
,功能不會那麼齊全,只需要能夠滿足我們的主要需求即可,主要是幫助大家了解FutureTask
原理。
實現代碼如下(分析都在注釋當中):
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// 這裡需要實現 Runnable 接口,因為需要將這個對象放入 Thread 類當中
// 而 Thread 要求傳入的對象實現了 Runnable 接口
public class MyFutureTask<V> implements Runnable {
private final Callable<V> callable;
private Object returnVal; // 這個表示我們最終的返回值
private final ReentrantLock lock;
private final Condition condition;
public MyFutureTask(Callable<V> callable) {
// 將傳入的 callable 對象存儲起來 方便在後面的 run 方法當中調用
this.callable = callable;
lock = new ReentrantLock();
condition = lock.newCondition();
}
@SuppressWarnings("unchecked")
public V get(long timeout, TimeUnit unit) {
if (returnVal != null) // 如果符合條件 說明 call 函數已經執行完成 返回值已經不為 null 了
return (V) returnVal; // 直接將結果返回即可 這樣不用競爭鎖資源 提高程序執行效率
lock.lock();
try {
// 這裡需要進行二次判斷 (雙重檢查)
// 因為如果一個線程在第一次判斷 returnVal 為空
// 然後這個時候它可能因為獲取鎖而被掛起
// 而在被掛起的這段時間,call 可能已經執行完成
// 如果這個時候不進行判斷直接執行 await方法
// 那後面這個線程將無法被喚醒
if (returnVal == null)
condition.await(timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return (V) returnVal;
}
@SuppressWarnings("unchecked")
public V get() {
if (returnVal != null)
return (V) returnVal;
lock.lock();
try {
// 同樣的需要進行雙重檢查
if (returnVal == null)
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return (V) returnVal;
}
@Override
public void run() {
if (returnVal != null)
return;
try {
// 在 Runnable 的 run 方法當中
// 執行 Callable 方法的 call 得到返回結果
returnVal = callable.call();
} catch (Exception e) {
e.printStackTrace();
}
lock.lock();
try {
// 因為已經得到了結果
// 因此需要將所有被 await 方法阻塞的線程喚醒
// 讓他們從 get 方法返回
condition.signalAll();
}finally {
lock.unlock();
}
}
// 下面是測試代碼
public static void main(String[] args) {
MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {
TimeUnit.SECONDS.sleep(2);
return 101;
});
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 輸出為 null
System.out.println(ft.get()); // 輸出為 101
}
}
我們現在用我們自己寫的MyFutureTask
去實現在前文當中數組求和的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] data = new int[100000];
Random random = new Random();
for (int i = 0; i < 100000; i++) {
data[i] = random.nextInt(10000);
}
@SuppressWarnings("unchecked")
MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10);
for (int i = 0; i < 10; i++) {
int idx = i;
tasks[i] = new MyFutureTask<>(() -> {
int sum = 0;
for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
sum += data[k];
}
return sum;
});
}
for (MyFutureTask<Integer> MyFutureTask : tasks) {
new Thread(MyFutureTask).start();
}
int threadSum = 0;
for (MyFutureTask<Integer> MyFutureTask : tasks) {
threadSum += MyFutureTask.get();
}
int sum = Arrays.stream(data).sum();
System.out.println(sum == threadSum); // 輸出結果為 true
}
總結
在本篇文章當中主要給大家介紹了FutureTask
的內部原理,並且我們自己通過使用ReentrantLock
和Condition
實現了我們自己的FutureTask
,本篇文章的主要內容如下:
FutureTask
的內部原理:FutureTask
首先會繼承Runnable
接口,這樣就可以將FutureTask
的對象直接放入Thread
類當中,作為構造函數的參數。- 我們在使用
FutureTask
的時候需要傳入一個Callable
實現類的對象,在函數call
當中實現我們需要執行的函數,執行完成之後,將call
函數的返回值保存下來,當有線程調用get
方法時候將保存的返回值返回。
- 我們使用條件變量進行對線程的阻塞和喚醒。
- 當有線程調用
get
方法時,如果call
已經執行完成,那麼可以直接將結果返回,否則需要使用條件變量將線程掛起。 - 當
call
函數執行完成的時候,需要使用條件變量將所有阻塞在get
方法的線程喚醒。
- 當有線程調用
- 雙重檢查:
- 我們在
get
方法當中首先判斷returnVal
是否為空,如果不為空直接將結果返回,這就可以不用去競爭鎖資源了,可以提高程序執行的效率。 - 但是我們在使用鎖保護的臨界區還需要進行判斷,判斷
returnVal
是否為空,因為如果一個線程在第一次判斷returnVal
為空,然後這個時候它可能因為獲取鎖而被掛起, 而在被掛起的這段時間,call 可能已經執行完成,如果這個時候不進行判斷直接執行 await方法,那後面這個線程將無法被喚醒,因為在call
函數執行完成之後調用了condition.signalAll()
,如果線程在這之後執行await
方法,那麼將來再沒有線程去將這些線程喚醒。
- 我們在
更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore
關注公眾號:一無是處的研究僧,了解更多計算機(Java、Python、計算機系統基礎、算法與數據結構)知識。