Java中的函數式編程(八)流Stream並行編程

寫在前面

在本系列文章的第一篇,我們提到了函數式編程的優點之一是「易於並發編程」。

Java作為一個多執行緒的語言,它通過 Stream 來提供了並發編程的便利性。

題外話:

嚴格來說,並發和並行是兩個不同的概念。
「並發(Concurrency)」強調的是在同一時間開始執行多個任務,通常會涉及多執行緒之間的上下文切換;
「並行(Parallelism)」強調的是將一個大任務分解為多個小任務後,再同時執行這些小任務,得到多個中間結果後再匯總為一個最終結果。
但在多CPU和分散式的時代,並發和並行的概念聯繫越來越緊密。至少在Java的Stream中,我們可以將並發和並行理解為同一個意思:基於多執行緒技術,對一個大任務分拆為多個小任務,分配到不同的執行緒中執行,得到多個中間結果後再匯總為一個最終結果。

本文的示例程式碼可從gitee上獲取://gitee.com/cnmemset/javafp

Stream的並行編程

並行編程是Stream的一個重要功能和特性。它的一個優點是:不管數據源是否執行緒安全,通過並行流(parallel stream)都可以輕鬆的實現並行編程。

Stream的並行編程,底層是基於 ForkJoinPool 技術來實現的。ForkJoinPool是Java 7引入的用於並行執行的任務框架,核心思想是將一個大任務拆分成多個小任務(即fork),然後再將多個小任務的處理結果匯總到一個結果上(即join)。此外,它也提供基本的執行緒池功能,譬如設置最大並發執行緒數,關閉執行緒池等。

在本系列之前的文章中,也零零散散的提到了一些關於並行編程的知識點。本文再做一個更系統的總結。

並行流(parallel stream)

Stream的並行操作都是基於並行流(parallel stream)。

生成一個並行流也非常簡單:

1. 通過 Collection.parallelStream 方法可以得到一個並行流

2. 生成一個串列的Stream後,可以通過方法 BaseStream.parallel() 將一個串列流(serial stream)轉換成並行流。當然,我們也可以通過方法 BaseStream.sequential() 將一個並行流轉換成串列流。

通過方法 BaseStream.isParallel() 可以判斷一個 stream 是否是並行流。

不管數據源是否執行緒安全(譬如ArrayList、HashSet,它們都不支援多執行緒),我們都可以使用parallelStream 輕鬆實現並行編程,不需要額外的執行緒同步操作,這是parallelStream 最大的優點。

順序性

encounter order,指的是Stream中元素的出現順序。如果覺得encounter order過於抽象,可以將它簡單理解為數據源(data source)的元素順序。本小節涉及到的有序或無序都特指encounter order。

一個Stream是否具備encounter order的有序性,取決於它的數據源(data source)和中間操作(intermediate operations)。例如,List或者數組的Steam是有序的,但HashSet的Steam則是無序的。而中間操作Stream.sorted,可以將一個無序的Stream轉換成有序的;中間操作Stream.unordered 則將一個有序的Stream轉換成無序的。

有趣的是,有些終止操作(terminal operations)是無視encounter order的。什麼意思呢?以最常見的Stream.forEach 為例,在並行執行的時候,即使數據源是List,forEach方法處理元素的順序也是無序的。要保證處理順序,需要使用方法 Stream.forEachOrdered 。

示例程式碼:

public static void forEachExample() {
    ArrayList<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
 
    System.out.println("===forEach====");
 
    // 在並行流中, forEach 方法是無視 Stream 的 encounter order 的
    list.parallelStream().forEach(i -> {
        System.out.println(i + ":thread-" + Thread.currentThread().getName());
    });
 
    System.out.println("===forEachOrdered====");
 
    // 在並行流中, forEachOrdered 方法可以保持 encounter order
    list.parallelStream().forEachOrdered(i -> {
        System.out.println(i + ":thread-" + Thread.currentThread().getName());
    });
}

上述程式碼的輸出類似:

===forEach====
3:thread-main
5:thread-ForkJoinPool.commonPool-worker-2
1:thread-main
4:thread-ForkJoinPool.commonPool-worker-3
2:thread-ForkJoinPool.commonPool-worker-1
===forEachOrdered====
1:thread-ForkJoinPool.commonPool-worker-4
2:thread-ForkJoinPool.commonPool-worker-1
3:thread-ForkJoinPool.commonPool-worker-1
4:thread-ForkJoinPool.commonPool-worker-1
5:thread-ForkJoinPool.commonPool-worker-1

可以看出,在並行執行時,forEach 是無視Stream的encounter order的,而 forEachOrdered 雖然也是在多執行緒環境下執行,但仍然可以保證Stream的encounter order。

在Stream並行編程中,理解encounter order很重要。因為對於大多數的Stream操作,即使是並行執行,如果Stream是有序的,那麼操作後得到的Stream也保持有序。例如,對一個數據源為List [1,2,3] 的有序Stream,執行 map(x -> x * x) 操作後,結果一定是 [1, 4, 9]。

對encounter order的有序性和無序性,示例程式碼如下:

public static void unorderedExample() {
    // 我們用 TreeMap 來做實驗,因為 ArrayList 的特殊性,很難展示 unordered 的特性
 
    // TreeSet 中的元素是按從小到大排序的,即 [-7, -3, 1, 5, 12]
    TreeSet<Integer> set = new TreeSet<>(Arrays.asList(1, 12, 5, -7, -3));
 
    // 按 encounter order 列印 set,輸出為:-7, -3, 1, 5, 12
    System.out.println("The encounter order of set: ");
    set.stream().forEachOrdered(s -> System.out.print(s + " "));
    System.out.println();
 
    // TreeSet 是有序的,所以來自 TreeSet 的 Stream 也是有序的
    // 當 Stream 是有序時,執行操作 limit(2) ,不管是串列還是並行,也不管執行多少次,結果都是前兩位數字 [-7, -3]
    System.out.println("Limit ordered Stream: ");
    set.stream().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
    System.out.println();
 
    // 我們使用 unordered 方法將 Stream 轉換為無序的。
    // 當 Stream 是無序時,並行執行操作 limit(2) ,會發現執行多次時,輸出的數字是不一樣的(不確定性)
    System.out.println("Limit unordered Stream: ");
    System.out.print("first time: ");
    set.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
    System.out.println();
    System.out.print("second time: ");
    set.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " "));
    System.out.println();
}

上述示例程式碼的輸出類似:

The encounter order of set:
-7 -3 1 5 12
Limit ordered Stream:
-7 -3
Limit unordered Stream:
first time: -3 5
second time: 5 12

大家可以仔細體會。歡迎加群討論!!!

純函數操作

回顧本系列文章的第一篇,純函數(purely function)指的是它不會改變函數以外的其它狀態,換而言之,即不會改變在該函數之外定義的變數值。純函數不會導致「副作用(side-effects)。

在Stream的並行編程中,純函數操作非常關鍵,否則我們依然需要考慮執行緒安全的問題。

舉例說明:

public static void unsafeParallelOperation() {
    List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong");

    // "副作用" 導致的執行緒不安全問題
    ArrayList<String> results = new ArrayList<>();
    provinces.parallelStream()
            // 過濾掉以 G 開頭的省份
            .filter(s -> !s.startsWith("G"))
            // 在 lambda表達式中修改了 results 的值,
            // 說明了 "s -> results.add(s)" 並非一個純函數,
            // 帶來了不必要的 "副作用",
            // 在並行執行時,會導致執行緒不安全的問題。
            .forEach(s -> results.add(s));

    System.out.println(results);
}

上述示例程式碼存在執行緒不安全的問題 —— 多個執行緒會同時修改 ArrayList 類型的 results ,我們需要對 results 變數加鎖。

正確的做法是:

public static void safeParallelOperation() {
    List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong");
    
    List<String> results = provinces.parallelStream()
            // 過濾掉以 G 開頭的省份
            .filter(s -> !s.startsWith("G"))
            // 沒有 "副作用"
            .collect(Collectors.toList());

    System.out.println(results);
}

通過內置的 Collectors.toList() 方法,就不存在「副作用」,從而也無需考慮執行緒安全問題。

Collectors與ConcurrentMap

回顧一下,在介紹Stream的規約方法 Stream.collect(Collector) 時,我們提到了一個需求場景:將員工按照部門分組。

並行執行的實現程式碼類似:

public static void groupEmployeesToMap() {
    List<Employee> employees = Utils.makeEmployees();
    Map<String, List<Employee>> map = employees.parallelStream()
            .collect(Collectors.groupingBy(Employee::getDepartment));
    System.out.println(map);
}

雖然上述程式碼可以實現功能,但性能可能並不盡如人意,因為在並行執行時,需要將多個中間結果匯總為最終的結果,但合併兩個Map,性能損耗可能非常大(例如HashMap,底層是數組+紅黑樹實現的,合併時複雜度不低)。

自然而然,聰明的Java程式設計師會想到:如果並行執行得到的中間結果和最終結果都是使用同一個Map實例,那就不需要合併兩個Map了,當然,因為並行執行涉及到多執行緒,因此,這個Map實例要求是執行緒安全的。典型的執行緒安全的Map,當然首選ConcurrentHashMap 啦。

這就是Collectors工具類中與ConcurrentMap相關的方法的實現原理,主要包括:

1. toConcurrentMap 系列方法

2. groupingByConcurrent 系列方法

但使用 ConcurrentHashMap 有個缺點:它不能保證 Stream 的 encounter order,所以只有當你確定元素的順序不影響最終結果時,才使用與ConcurrentMap相關的方法。

最後,還要注意,只有在並行編程時,我們才要考慮使用 toConcurrentMap 或者 groupingByConcurrent 方法,否則會因為不必要的執行緒同步操作,反而影響了性能。

規約操作的注意事項

在本系列介紹規約操作的文章中,已經提到了很多關於並行編程的注意事項,本小節將它們匯總起來,供大家參考。

reduce(T, BinaryOperator)

reduce(T, BinaryOperator)的方法簽名是:

T reduce(T identity, BinaryOperator<T> accumulator);

其中 T 是 Stream 的泛型類型。

參數 identity 是規約操作的初始值。

參數accumulator 要求滿足結合律(associative)。

參數 accumulator 定義的函數必須滿足結合律(associative),否則在一些順序不確定的或並行的場景中會導致不正確的結果。

此外,如果是並行執行的話,對參數 identity 還有一個要求:對任意值 t,要滿足 accumulator.apply(identity, t) == t 。否則,會導致錯誤的結果。

public static void reduceStream2() {
    List<Integer> list = Arrays.asList(1, 3, 5, 7, 9);
 
    // 這是正確的範例:因為數字 0 是累加操作的 identity 。
    sum = list.parallelStream().reduce(0, (x, y) -> x + y);
    // 輸出為 0+1+3+5+7+9 = 25
    System.out.println(sum);
 
    // 這是錯誤的範例:因為數字 5 並不是累加操作的 identity 。
    sum = list.parallelStream().reduce(5, (x, y) -> x + y);
    // 本意是輸出為 5+1+3+5+7+9 = 30,但實際上會輸出一個比30大的數字。
    System.out.println(sum);
}

reduce(U, BiFunction, BinaryOperator)

具體的方法簽名是:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator,
             BinaryOperator<U> combiner);

其中 U 是返回值的類型,T 是 Stream 的泛型類型。

參數 identity 是規約操作的初始值。

參數accumulator 是與Stream中單個元素的合併操作,等同於函數 U apply(U u, T t)。

參數 combiner 是將並行執行得到的多個中間結果進行合併的操作,等同於函數 U apply(U u1, U u2)。

在並行編程中,對3個參數都有一些特殊要求:

1. 參數 combiner 必須滿足結合律

2. 參數 identity,對於任意值 u,必須滿足 combiner.apply(identity, u) == u

3. 參數 accumulator 和 combiner 兩者必須兼容,即對於任意值 u 和 t,必須滿足:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

collect(Supplier, BiConsumer, BiConsumer)

ollect方法的簽名是:

<R> R collect(Supplier<R> supplier,
              BiConsumer<R, ? super T> accumulator,
              BiConsumer<R, R> combiner);

其中 R 是返回值的類型,通常是一個容器類(例如 Collection 或 Map)。T 是Stream中的元素類型。

參數 supplier 是用來創建一個容器實例的函數。

參數 accumulator 是將Stream中的一個元素合併到容器中的函數。

參數 combiner 是將兩個容器歸併為一個容器的函數,只在並行執行的時候用到。

在並行執行的場景下,我們有一些額外的要求:

1. combiner函數滿足結合律

2. 要求combiner 和 accumulator 是兼容的(compatible),即對於任意的r和t, combiner.accept(r, accumulator.accept(supplier.get(), t)) == accumulator.accept(r, t)

結語

Stream 提供了非常方便的並行編程API,但它還是存在很多問題,非常容易踩坑。

其中,最為人詬病的是它的不可控性。因為 Parallel Stream 的底層是基於 ForkJoinPool ,而 ForkJoinPool 的工作執行緒數是在虛擬機啟動時指定的,如果 Stream 並行執行的任務數量過多或耗時過多,甚至會影響應用程式中其它使用 ForkJoinPool 的功能。

總的來說,除非你非常了解你正在做的事情,否則不要使用 Stream 的並行編程API 。取而代之,我們可以直接使用Java中多執行緒技術(例如執行緒池)來處理。