Spring WebFlux 學習筆記 – (一) 前傳:學習Java 8 Stream Api (3) – Stream的終端操作
- 2020 年 6 月 10 日
- 筆記
- Reactor, Spring WebFlux, Stream Api, WebFlux
Stream API
Java8中有兩大最為重要的改變:第一個是 Lambda 表達式;另外一個則是 Stream API(java.util.stream.*)。
Stream 是 Java8 中處理集合的關鍵抽象概念,它可以指定你希望對集合進行的操作,可以執行非常複雜的查找、過濾和映射數據等操作。使用Stream API 對集合數據進行操作,就類似於使用 SQL 執行的數據庫查詢。也可以使用 Stream API 來並行執行操作。簡而言之,Stream API 提供了一種高效且易於使用的處理數據的方式。
流(Stream)是數據渠道,用於操作數據源(集合、數組等)所生成的元素序列。”集合講的是數據,流講的是計算! “
集合和流(Stream),表面上有一些相似之處,他們有不同的目標。集合主要關注其元素的有效管理和訪問,相比之下,流不提供直接訪問或操縱元素的手段,而是關心聲明性地描述其源和將在該源上進行聚合的計算操作。
上篇內容我們學習了Stream的中間操作,接下來我們來看下Stream數據流的結果消費,即終端(終止)操作。以下用 終端操作 統稱。
Stream的終端操作
流管道通過源生成,經過零個或多個中間操作後,進行最後的終端操作,由此產生結果或副作用,如count()或forEach(Consumer)。
我們將終端操作的結果分為如下幾類:
- 匹配
- 統計
- 消費
- 轉換
以下內容提到XxxStream代表IntStream、LongStream、DoubleStream。如無特殊說明Stream也包含IntStream、LongStream、DoubleStream。
匹配
匹配類型的終端操作返回值為布爾值,根據使用語境調用不同的方法及傳入謂語實現確定是否匹配。
序號 | 支持的類 | 方法定義 | 方法說明 |
---|---|---|---|
1 | Stream |
boolean anyMatch(Predicate<? super T> predicate); | 部分匹配,返回此流的任何元素是否與提供的謂詞匹配。 |
2 | Stream |
boolean allMatch(Predicate<? super T> predicate); | 全部匹配,返回此流的所有元素是否與提供的謂詞匹配。 |
3 | Stream |
boolean noneMatch(Predicate<? super T> predicate); | 全不匹配,返回此流中是否沒有元素與提供的謂詞匹配。 |
注意:如果流為空時,allMatch方法始終返回true;noneMatch方法始終返回true。
以下代碼見 StreamTerminalOperationMatchTest。
anyMatch的使用
// 是否存在匹配的元素,true
log.info("[1, 2, 3, 4, 5, 6]存在偶數否:{}",
Stream.of(1, 2, 3, 4, 5, 6).anyMatch(n -> n % 2 == 0));
allMatch的使用
// 全部元素是否都匹配,false
log.info("[1, 2, 3, 4, 5, 6]全部都是偶數否:{}",
Stream.of(1, 2, 3, 4, 5, 6).allMatch(n -> n % 2 == 0));
noneMatch的使用
// 全部元素是否都不匹配,false
log.info("[1, 2, 3, 4, 5, 6]全部都不是偶數否:{}",
Stream.of(1, 2, 3, 4, 5, 6).noneMatch(n -> n % 2 == 0));
空流驗證
// 空流中不存在任何匹配元素,所以返回false
log.info("空流是否AnyMatch:{}", Stream.empty().anyMatch(Objects::isNull));
// 空流中不存在不匹配的,即全部匹配,所以返回true
log.info("空流是否AllMatch:{}", Stream.empty().allMatch(Objects::isNull));
// 空流中全部都不匹配,所以返回true
log.info("空流是否NoneMatch:{}", Stream.empty().noneMatch(Objects::isNull));
統計
統計類型的終端操作是對流元素的統計,如元素個數、最大值、最小值、統計對象等。
序號 | 支持的類 | 方法定義 | 方法說明 |
---|---|---|---|
1 | Stream |
long count(); | 返回此流中的元素數。 |
2 | Stream |
Optional |
根據提供的 Comparator返回此流的最小元素。 |
3 | Stream |
Optional |
根據提供的 Comparator返回此流的最大元素。 |
4 | Stream |
OptionalXxx min(); | 返回 OptionalInt此流的最小元素的OptionalInt,如果此流為空,則返回一個空的可選項。 |
5 | XxxStream |
OptionalXxx max(); | 返回 OptionalInt此流的最大元素的OptionalInt,如果此流為空,則返回一個空的可選項。 |
6 | XxxStream |
OptionalDouble average(); | 返回 OptionalDouble此流的元素的算術平均值的OptionalDouble,如果此流為空,則返回空的可選項。 |
7 | XxxStream |
Xxx sum(); | 返回此流中元素的總和。 |
8 | XxxStream |
XxxSummaryStatistics summaryStatistics(); | 返回一個 IntSummaryStatistics描述有關此流的元素的各種摘要數據。 |
XxxSummaryStatistics類型的統計對象,如IntSummaryStatistics,除了提供最小值、最大值、平均值、元素個數、總和外,還提供了accept、combine兩個方法,分別支持添加新的數據和連接另外的統計對象,並自動重新統計結果。
以下代碼見 StreamTerminalOperationStatisticsTest。
count的使用
log.info("[1, 2, 3, 4, 5, 6]元素個數:{}",
Stream.of(1, 2, 3, 4, 5, 6).count());
min的使用(使用Comparator比較)
log.info("[1, 2, 3, 4, 5, 6]的最大值:{}",
Stream.of(1, 2, 3, 4, 5, 6).min(Comparator.comparingInt(n -> n)).get());
max的使用(使用Comparator比較)
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
Stream.of(1, 2, 3, 4, 5, 6).max(Comparator.comparingInt(n -> n)).get());
min的使用
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
IntStream.of(1, 2, 3, 4, 5, 6).min().getAsInt());
max的使用
log.info("[1, 2, 3, 4, 5, 6]的最大值:{}",
IntStream.of(1, 2, 3, 4, 5, 6).max().getAsInt());
average的使用
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
IntStream.of(1, 2, 3, 4, 5, 6).average().getAsDouble());
sum的使用
log.info("[1, 2, 3, 4, 5, 6]的求和:{}",
IntStream.of(1, 2, 3, 4, 5, 6).sum());
summaryStatistics的使用
IntSummaryStatistics summaryStatistics = IntStream.of(1, 2, 3, 4, 5, 6).summaryStatistics();
log.info("[1, 2, 3, 4, 5, 6]的統計對象:{}", summaryStatistics);
summaryStatistics.accept(7);
log.info("添加7後,統計對象變為:{}", summaryStatistics);
IntSummaryStatistics summaryStatistics2 = IntStream.of(8, 9).summaryStatistics();
summaryStatistics.combine(summaryStatistics2);
log.info("合併[8, 9]後,統計對象變為:{}", summaryStatistics);
消費
消費類型的終端操作是對流內元素的獲取或循環消費。
序號 | 支持的類 | 方法定義 | 方法說明 |
---|---|---|---|
1 | Stream |
Optional |
返回描述此流的第一個元素的Optional,如果流為空,則返回一個空的Optional。 |
2 | Stream |
Optional |
返回描述流的一些元素的Optional如果流為空,則返回一個空的Optional。 |
3 | Stream |
void forEach(Consumer<? super T> action); | 對此流的每個元素執行操作。 |
4 | Stream |
void forEachOrdered(Consumer<? super T> action); | 如果流具有定義的順序,則以流的順序對該流的每個元素執行操作。 |
看了forEach和forEachOrdered的Api的說明,大家可能對這兩個還是有點疑問,這裡特別說明下,forEach在並行流(parallel後面會講)中並不按照流內元素之前定義的順序執行操作,是無序的,而forEachOrdered會按照流之前定義的順序執行操作。除非必要,在並行流中不建議使用forEachOrdered對其進行排序執行操作,否則影響性能。
流有可能也可能沒有定義順序。流是否有順序取決於源和中間操作。某些流源(如List或數組)本質上是有序的,而其他數據源(如HashSet)不是。一些中間操作(例如sorted())可以在其他無序流上排序,而其他中間操作可以使有序流無序,例如BaseStream.unordered()。此外,一些終端操作可能會忽略順序,如forEach()。
如果一個流被命令,大多數操作被限制為在遇到的順序中對元素進行操作; 如果流的源是List含有[1, 2, 3] ,然後執行的結果map(x -> x*2)必須是[2, 4, 6] 。 然而,如果源沒有定義的順序,則任何[2, 4, 6]排列組合值都將是有效結果。
對於順序流,遇到順序的存在或不存在不影響性能,僅影響確定性。 如果流被排序,在相同的源上重複執行相同的流管線將產生相同的結果; 如果沒有排序,重複執行可能會產生不同的結果。
對於並行流,放寬排序約束有時可以實現更有效的執行。如果元素的排序不相關,某些聚合操作,例如過濾重複(distinct())或組合減少(Collectors.groupingBy())可以更有效地實現。 類似地,本質上與遇到順序相關的操作,如limit()可能需要緩衝以確保正確排序,從而破壞並行性的好處。另外,當流遇到排序,但用戶並不特別在意那次偶遇秩序的情況下,明確地去操作為無序流unordered()可以提高某些狀態或終端操作的並行性能。 然而,大多數流管線,例如上面的例子的「權重之和」,仍然在有序的限制下有效地並行化。
以下代碼見 StreamTerminalOperationConsumeTest。
findFirst的使用
log.info("[1, 2, 3, 4, 5, 6]的首個值:{}",
Stream.of(1, 2, 3, 4, 5, 6).parallel().findFirst().get());
findAny的使用
log.info("[1, 2, 3, 4, 5, 6]的任意值:{}",
Stream.of(1, 2, 3, 4, 5, 6).parallel().findAny().get());
forEach的使用
// 並行後順序隨機,輸出不保證順序
log.info("[1, 2, 3, 4, 5, 6]的並行後循環輸出:");
Stream.of(1, 2, 3, 4, 5, 6).parallel().forEach(System.out::println);
forEachOrdered的使用
// 無論是否並行,始終按照流定義的順序或排序後的結果輸出
log.info("[1, 2, 5, 6, 3, 4]的並行後順序循環輸出:");
Stream.of(1, 2, 5, 6, 3, 4).sorted().parallel().forEachOrdered(System.out::println);
轉換
轉換類型的終端操作是將流轉換為另一種對象使用。
序號 | 支持的類 | 方法定義 | 方法說明 |
---|---|---|---|
1 | Stream |
Optional |
使用associative累積函數對此流的元素執行reduction,並返回描述減小值(如果有的話)的Optional 。 |
2 | Stream |
T reduce(T identity, BinaryOperator |
使用提供的身份值和 associative累積功能對此流的元素執行 reduction ,並返回減小的值。 |
3 | Stream |
U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator combiner); | 執行 reduction在此流中的元素,使用所提供的身份,積累和組合功能。 |
4 | Stream |
<R, A> R collect(Collector<? super T, A, R> collector); | 使用 Collector對此流的元素執行 mutable reduction Collector。 |
5 | Stream |
對此流的元素執行 mutable reduction操作。 |
此處我們着重說下序號3,帶有3個參數的reduce方法,該方法支持轉換元素(結果)類型,即從類型T轉換為類型U。第1個參數代表初始值;第2個參數是累加器函數式接口,輸入類型U和類型T,返回類型U;第3個參數是組合器函數式接口,輸入類型U和類型U,返回類型U。該方法的第3個參數在並行執行下有效。
同時需要注意,此方法有如下要求:
- u = combiner(identity, u);
- combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t);
該方法的代碼示例見reduce的使用3和reduce的使用4,如果對該示例不了解,可以在後面的章節中講解了並行執行之後再回過頭來看該示例。
以下代碼見 StreamTerminalOperationTransformTest。
reduce的使用1
// 使用reduce方式實現查找最小值
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
Stream.of(1, 2, 3, 4, 5, 6).reduce(Integer::min).get());
reduce的使用2
// 使用reduce方式實現求和
log.info("[1, 2, 3, 4, 5, 6]的求和:{}",
Stream.of(1, 2, 3, 4, 5, 6).reduce(0, Integer::sum));
reduce的使用3
// 求單詞長度之和
Integer lengthSum = Stream.of("I", "love", "you", "too")
.parallel()
.reduce(0,// 初始值 // (1)
(sum, str) -> sum + str.length(), // 累加器 // (2)
Integer::sum);// 部分和拼接器,並行執行時才會用到 // (3)
// int lengthSum = stream.mapToInt(str -> str.length()).sum();
log.info("ILoveYouToo的長度為:{}", lengthSum);
reduce的使用4
// 下方方法同步執行時,能出現正確結果
// 並行執行時,將出現意想不到的結果
// 多線程執行時,append導致初始值identity發生了變化,而多線程又導致了數據重複添加
StringBuffer word = Stream.of("I", "love", "you", "too")
.parallel() // 同步執行注釋該步驟
.reduce(new StringBuffer(),// 初始值 // (1)
StringBuffer::append, // 累加器 // (2)
StringBuffer::append);// 部分和組合器,並行執行時才會用到 // (3)
log.info("拼接字符串為:{}", word);
// 此處如果使用字符串concat,導致性能降低,不停創建字符串常量
String word2 = Stream.of("I", "love", "you", "too")
.parallel() // 同步執行注釋該步驟
.reduce("",// 初始值 // (1)
String::concat, // 累加器 // (2)
String::concat);// 部分和組合器,並行執行時才會用到 // (3)
log.info("拼接字符串為:{}", word2);
// 下面方法並行執行時,雖然能達到正確的結果,但是並未滿足reduce的要求
List<Integer> accResult = Stream.of(1, 2, 3, 4)
.parallel()
.reduce(Collections.synchronizedList(new ArrayList<>()),
(acc, item) -> {
List<Integer> list = new ArrayList<>();
list.add(item);
System.out.println("item BiFunction : " + item);
System.out.println("acc+ BiFunction: " + list);
return list;
}, (accs, items) -> {
accs.addAll(items);
System.out.println("item BinaryOperator: " + items);
System.out.println("acc+ BinaryOperator: " + accs);
return accs;
});
log.info("accResult: {}", accResult);
由於時間及版面的緣故,本期就先講到這裡,下期在着重將collect。
源碼詳見://github.com/crystalxmumu/spring-web-flux-study-note
以上是本期筆記的內容,我們下期見。