並發編程之:ForkJoin

大家好,我是小黑,一個在互聯網苟且偷生的農民工。

在JDK1.7中引入了一種新的Fork/Join線程池,它可以將一個大的任務拆分成多個小的任務並行執行並匯總執行結果。

Fork/Join採用的是分而治之的基本思想,分而治之就是將一個複雜的任務,按照規定的閾值劃分成多個簡單的小任務,然後將這些小任務的結果再進行匯總返回,得到最終的任務。

分治法

分治法是計算機領域常用的算法中的其中一個,主要思想就是將將一個規模為N的問題,分解成K個規模較小的子問題,這些子問題相互獨立且與原問題性質相同;求解出子問題的解,合併得到原問題的解。

解決問題的思路

  • 分割原問題;
  • 求解子問題;
  • 合併子問題的解為原問題的解。

使用場景

二分查找,階乘計算,歸併排序,堆排序、快速排序、傅里葉變換都用了分治法的思想。

ForkJoin並行處理框架

在JDK1.7中推出的ForkJoinPool線程池,主要用於ForkJoinTask任務的執行,ForkJoinTask是一個類似線程的實體,但是比普通線程更輕量。

我們來使用ForkJoin框架完成以下1-10億求和的代碼。

public class ForkJoinMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L, 10_0000_0000L));
        System.out.println("計算結果:" + rootTask.get());
    }
}

class SumForkJoinTask extends RecursiveTask<Long> {
    private final Long min;
    private final Long max;
    private Long threshold = 1000L;

    public SumForkJoinTask(Long min, Long max) {
        this.min = min;
        this.max = max;
    }
    @Override
    protected Long compute() {
        // 小於閾值時直接計算
        if ((max - min) <= threshold) {
            long sum = 0;
            for (long i = min; i < max; i++) {
                sum = sum + i;
            }
            return sum;
        }
        // 拆分成小任務
        long middle = (max + min) >>> 1;
        SumForkJoinTask leftTask = new SumForkJoinTask(min, middle);
        leftTask.fork();
        SumForkJoinTask rightTask = new SumForkJoinTask(middle, max);
        rightTask.fork();
        // 匯總結果
        return leftTask.join() + rightTask.join();
    }
}

上述代碼邏輯可通過下圖更加直觀的理解。

ForkJoin框架實現

在ForkJoin框架中重要的一些接口和類如下圖所示。

ForkJoinPool

ForkJoinPool是用於運行ForkJoinTasks的線程池,實現了Executor接口。

可以通過new ForkJoinPool()直接創建ForkJoinPool對象。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode){
    this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
}

通過查看構造方法源碼我們可以發現,在創建ForkJoinPool時,有以下4個參數:

  • parallelism:期望並發數。默認會使用Runtime.getRuntime().availableProcessors()的值
  • factory:創建ForkJoin工作線程的工廠,默認為defaultForkJoinWorkerThreadFactory
  • handler:執行任務時遇到不可恢復的錯誤時的處理程序,默認為null
  • asyncMode:工作線程獲取任務使用FIFO模式還是LIFO模式,默認為LIFO

ForkJoinTask

ForkJoinTask是一個對於在ForkJoinPool中運行任務的抽象類定義。

可以通過少量的線程處理大量任務和子任務,ForkJoinTask實現了Future接口。主要通過fork()方法安排異步任務執行,通過join()方法等待任務執行的結果。

想要使用ForkJoinTask通過少量的線程處理大量任務,需要接受一些限制。

  • 拆分的任務中避免同步方法或同步代碼塊;
  • 在細分的任務中避免執行阻塞I/O操作,理想情況下基於完全獨立於其他正在運行的任務訪問的變量;
  • 不允許在細分任務中拋出受檢異常。

因為ForkJoinTask是抽象類不能被實例化,所以在使用時JDK為我們提供了三種特定類型的ForkJoinTask父類供我們自定義時繼承使用。

  • RecursiveAction:子任務不返回結果
  • RecursiveTask:子任務返回結果
  • CountedCompleter:在任務完成執行後會觸發執行

ForkJoinWorkerThread

ForkJoinPool中用於執行ForkJoinTask的線程。

ForkJoinPool既然實現了Executor接口,那麼它和我們常用的ThreadPoolExecutor之前又有什麼差異呢?

如果們使用ThreadPoolExecutor來完成分治法的邏輯,那麼每個子任務都需要創建一個線程,當子任務的數量很大的情況下,可能會達到上萬個,那麼使用ThreadPoolExecutor創建出上萬個線程,這顯然是不可行、不合理的;

ForkJoinPool在處理任務時,並不會按照任務開啟線程,只會按照指定的期望並行數量創建線程。在每個線程工作時,如果需要繼續拆分子任務,則會將當前任務放入ForkJoinWorkerThread的任務隊列中,遞歸處理直到最外層的任務。

工作竊取算法

ForkJoinPool的各個工作線程都會維護一個各自的任務隊列,減少線程之間對於任務的競爭;

每個線程都會先保證將自己隊列中的任務執行完,當自己的任務執行完之後,會去看其他線程的任務隊列中是否有未處理完的任務,如果有則會幫助其他線程執行;

為了減少在幫助其他線程執行任務時發生競爭,會使用雙端隊列來存放任務,被竊取的任務只會從隊列的頭部獲取任務,而正常處理的線程每次都是從隊列的尾部獲取任務。

優點

充分利用了線程資源,避免資源的浪費,並且減少了線程間的競爭。

缺點

需要給每個線程開闢一個隊列空間;在工作隊列中只有一個任務時同樣會存在線程競爭。

最後

如果覺得文章對你有點幫助,不妨掃碼點個關注。我是小黑,下期見~