【Flutter 專題】80 初識 Flutter Stream (一)
- 2020 年 3 月 31 日
- 筆記
和尚在之前嘗試 EventChannel 時曾經用到過 Stream 流數據,現在準備學習一下 flutter_bloc 時也主要用到 Stream 來做非同步處理,於是簡單學習一下何為 Stream?
A source of asynchronous data events.
Stream 主要應用於 Flutter 的非同步操作,在其他程式語言中也存在;Stream 提供了一種接受事件隊列的方法,可通過 listen 進行數據監聽,通過 error 接收失敗狀態,通過 done 來接收結束狀態;
1. Stream 創建
Flutter 提供了多種創建 Stream 的方式;
1.1 Stream.fromFuture(Futurefuture)
Stream 通過 Future 創建新的單訂閱流,當 Future 完成時會觸發 data / error,然後以 done 事件結束;
Future<String> getData() async { await Future.delayed(Duration(seconds: 3)); return '當前時間為:${DateTime.now()}'; } _streamFromFuture() { Stream.fromFuture(getData()) .listen((event) => print('Stream.fromFuture -> $event')) .onDone(() => print('Stream.fromFuture -> done 結束')); }

1.2 Stream.fromFutures(Iterable<future> futures)</future
Stream 通過一系列的 Future 創建新的單訂閱流,每個 Future 都會有自身的 data / error 事件,當這一系列的 Future 均完成時,Stream 以 done 事件結束;若 Futures 為空,則 Stream 會立刻關閉;其分析源碼,很直接的看到是將每一個 Future 事件監聽完之後才會執行的微事件結束;
====================================== 源碼 ====================================== factory Stream.fromFutures(Iterable<Future<T>> futures) { _StreamController<T> controller = new _SyncStreamController<T>(null, null, null, null); int count = 0; var onValue = (T value) { if (!controller.isClosed) { controller._add(value); if (--count == 0) controller._closeUnchecked(); } }; var onError = (error, StackTrace stack) { if (!controller.isClosed) { controller._addError(error, stack); if (--count == 0) controller._closeUnchecked(); } }; for (var future in futures) { count++; future.then(onValue, onError: onError); } // Use schedule microtask since controller is sync. if (count == 0) scheduleMicrotask(controller.close); return controller.stream; } ====================================== 測試 ====================================== _streamFromFutures() { var data = [getData(), getData(), getData()]; Stream.fromFutures(data) .listen((event) => print('Stream.fromFutures -> $event')) .onDone(() => print('Stream.fromFutures -> done 結束')); }

1.3 Stream.fromIterable(Iterableelements)
Stream 通過數據集合中獲取並創建單訂閱流,通過 listen 監聽迭代器中每一個子 element,當 Stream 監聽到取消訂閱或 Iterator.moveNext 返回 false / throw 異常 時停止迭代;
_streamFromIterable() { var data = [1, 2, '3.toString()', true, false, 6]; Stream.fromIterable(data) .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => print('Stream.fromIterable -> done 結束')); }

1.4 Stream.periodic(Duration period, [T computation(int computationCount)])
Stream 通過 Duration 對象作為參數創建一個周期性事件流,其中若不設置 computation 時 onData 獲取數據為 null;若沒有事件結束則會一直周期性執行;
_streamFromPeriodic() { Duration interval = Duration(seconds: 1); // onData 獲取數據為 null Stream<int> stream = Stream<int>.periodic(interval); stream.listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); }); // onData 獲取數據為 int 類型 data Stream<int> streamData = Stream<int>.periodic(interval, (data) => data); streamData.listen((event) { print('Stream.periodic -> $event'); if (event >= 10) {} }).onDone(() { print('Stream.periodic -> done 結束'); }); }


2. Stream 基本操作
2.1 Stream<T> take(int count)
take() 對於單訂閱方式,可以提供 take 設置之前的 Stream 訂閱數據,例如設置中斷 Stream.periodic 周期展示次數;和尚粗略理解為 take 可以作為中斷訂閱,如果 take 設置次數大於 onDone 之前的訂閱數據次數,Stream 依舊獲取所有 onDone 之前的訂閱數據;
_streamFromPeriodic() { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.take(5).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); }); }

_streamFromIterable() { var data = [1, 2, '3.toString()', true, false, 6]; Stream.fromIterable(data) .take(8) .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => print('Stream.fromIterable -> done 結束')); }

2.2 Stream<T> takeWhile(bool test(T element))
takeWhile 也可以實現上述相同效果,通過 test 返回一個 boolean 類型,如果為 false 則中斷訂閱;
_streamFromPeriodic() { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 5; }).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); }); }

2.3 Stream<T> where(bool test(T event))
where 用於在當前 Stream 中創建一個新的 Stream 用來丟棄不符合 test 的數據;和尚簡單理解為類似資料庫查詢一樣,僅過濾符合需求的數據流;且 where 可以設置多次;
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 5; }).where((event) { print('Stream.periodic.where -> $event'); return event > 3; }).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); });

2.4 Stream<T> distinct([bool equals(T previous, T next)])
distinct 和尚理解為相鄰兩個數據濾重;
var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream.fromIterable(data) .distinct() .listen((event) => print('Stream.fromIterable -> $event')) .onDone(() => print('Stream.fromIterable -> done 結束'));

2.5 Stream<T> skip(int count)
skip 用於跳過符合條件的訂閱數據次數;
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }) .skip(2).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); });

2.6 Stream<T> skipWhile(bool test(T element))
skipWhile 用於跳過在 where 符合條件下滿足設置 test 條件的訂閱數據;即當 test 為 true 時跳過當前訂閱數據監聽;
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); });

2.7 Stream<S> map<S>(S convert(T event))
在當前 Stream 基礎上創建一個新的 Stream 並對當前 Stream 進行數據操作,onData 監聽到的是 map 變更後的新的數據流;
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).map((event) { print('Stream.periodic.map -> $event -> ${event * 100}'); return event * 100; }).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); });

2.8 Stream<S> expand<S>(Iterable<S> convert(T element))
在當前 Stream 基礎上創建新的 Stream 並將當前訂閱數據轉為新的訂閱數據組,onData 監聽 數據組 中每個新的訂閱數據元素;
Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }).expand((element) { print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}'); return [element, element * 10, element * 100]; }).listen((event) { print('Stream.periodic -> $event'); }).onDone(() { print('Stream.periodic -> done 結束'); });

2.9 Future<int> get length
Stream 監聽訂閱事件結束後,符合 where 條件的數量;
_streamLength(index) async { Duration interval = Duration(seconds: 1); Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1); streamData = streamData.takeWhile((element) { print('Stream.periodic.takeWhile -> $element'); return element <= 6; }).where((event) { print('Stream.periodic.where -> $event'); return event > 2; }).skipWhile((element) { print('Stream.periodic.skipWhile -> $element'); return element <= 4; }); switch (index) { case 1: var length = await streamData.length; print('Stream.length -> $length'); break; case 2: var isEmpty = await streamData.isEmpty; print('Stream.isEmpty -> $isEmpty'); break; case 3: var isBroadcast = await streamData.isBroadcast; print('Stream.isBroadcast -> $isBroadcast'); break; case 4: var first = await streamData.first; print('Stream.first -> $first'); break; case 5: var last = await streamData.last; print('Stream.last -> $last'); break; } }

2.10 Future<bool> get isEmpty
Stream 監聽訂閱事件結束後,統計是否符合 where 條件的訂閱數據是否為空;
_streamLength(2);

2.11 Future<T> get first
獲取 Stream 符合條件的第一個訂閱數據;
_streamLength(4);

2.12 Future<T> get last
獲取 Stream 符合條件的最後一個訂閱數據;
_streamLength(5);

2.13 Future<List<T>> toList()
在 Stream 監聽結束之後,將訂閱數據存儲在 List 中,該操作為非同步操作;
_streamToList() async { var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream stream = Stream.fromIterable(data).distinct(); List list = await stream.toList(); if (list != null) { print('Stream.toList -> ${list}'); for (int i = 0; i < list.length; i++) { print('Stream.toList -> ${i + 1} -> ${list[i]}'); } } }

2.14 Future<Set<T>> toSet()
在 Stream 監聽結束之後,將訂閱數據存儲在 Set 中,Set 可以過濾重複數據;
_streamToSet() async { var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream stream = Stream.fromIterable(data); Set set = await stream.toSet(); if (set != null) { print('Stream.toSet -> ${set}'); } }

2.15 Future forEach(void action(T element))
監聽 Stream 中訂閱數據,是對 listen 方式的一種監聽;
_streamForEach() { var data = [1, 2, '3.toString()', true, true, false, true, 6]; Stream stream = Stream.fromIterable(data).distinct(); stream.forEach((element) => print('Stream.forEach -> $element')); }
