【Flutter 專題】80 初識 Flutter Stream (一)

  • 2020 年 3 月 31 日
  • 筆記

和尚在之前嘗試 EventChannel 時曾經用到過 Stream 流數據,現在準備學習一下 flutter_bloc 時也主要用到 Stream 來做非同步處理,於是簡單學習一下何為 Stream

A source of asynchronous data events.

Stream 主要應用於 Flutter 的非同步操作,在其他程式語言中也存在;Stream 提供了一種接受事件隊列的方法,可通過 listen 進行數據監聽,通過 error 接收失敗狀態,通過 done 來接收結束狀態;

1. Stream 創建

Flutter 提供了多種創建 Stream 的方式;

1.1 Stream.fromFuture(Futurefuture)

Stream 通過 Future 創建新的單訂閱流,當 Future 完成時會觸發 data / error,然後以 done 事件結束;

Future<String> getData() async {    await Future.delayed(Duration(seconds: 3));    return '當前時間為:${DateTime.now()}';  }    _streamFromFuture() {    Stream.fromFuture(getData())        .listen((event) => print('Stream.fromFuture -> $event'))        .onDone(() => print('Stream.fromFuture -> done 結束'));  }  
1.2 Stream.fromFutures(Iterable<future> futures)</future

Stream 通過一系列的 Future 創建新的單訂閱流,每個 Future 都會有自身的 data / error 事件,當這一系列的 Future 均完成時,Streamdone 事件結束;若 Futures 為空,則 Stream 會立刻關閉;其分析源碼,很直接的看到是將每一個 Future 事件監聽完之後才會執行的微事件結束;

====================================== 源碼 ======================================  factory Stream.fromFutures(Iterable<Future<T>> futures) {      _StreamController<T> controller =          new _SyncStreamController<T>(null, null, null, null);      int count = 0;      var onValue = (T value) {        if (!controller.isClosed) {          controller._add(value);          if (--count == 0) controller._closeUnchecked();        }      };      var onError = (error, StackTrace stack) {        if (!controller.isClosed) {          controller._addError(error, stack);          if (--count == 0) controller._closeUnchecked();        }      };      for (var future in futures) {        count++;        future.then(onValue, onError: onError);      }      // Use schedule microtask since controller is sync.      if (count == 0) scheduleMicrotask(controller.close);      return controller.stream;  }  ====================================== 測試 ======================================  _streamFromFutures() {    var data = [getData(), getData(), getData()];    Stream.fromFutures(data)        .listen((event) => print('Stream.fromFutures -> $event'))        .onDone(() => print('Stream.fromFutures -> done 結束'));  }  
1.3 Stream.fromIterable(Iterableelements)

Stream 通過數據集合中獲取並創建單訂閱流,通過 listen 監聽迭代器中每一個子 element,當 Stream 監聽到取消訂閱或 Iterator.moveNext 返回 false / throw 異常 時停止迭代;

_streamFromIterable() {    var data = [1, 2, '3.toString()', true, false, 6];    Stream.fromIterable(data)        .listen((event) => print('Stream.fromIterable -> $event'))        .onDone(() => print('Stream.fromIterable -> done 結束'));  }  
1.4 Stream.periodic(Duration period, [T computation(int computationCount)])

Stream 通過 Duration 對象作為參數創建一個周期性事件流,其中若不設置 computationonData 獲取數據為 null;若沒有事件結束則會一直周期性執行;

_streamFromPeriodic() {    Duration interval = Duration(seconds: 1);    // onData 獲取數據為 null    Stream<int> stream = Stream<int>.periodic(interval);    stream.listen((event) {    print('Stream.periodic -> $event');    }).onDone(() {    print('Stream.periodic -> done 結束');    });      // onData 獲取數據為 int 類型 data    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);    streamData.listen((event) {      print('Stream.periodic -> $event');      if (event >= 10) {}    }).onDone(() {      print('Stream.periodic -> done 結束');    });  }  

2. Stream 基本操作

2.1 Stream<T> take(int count)

take() 對於單訂閱方式,可以提供 take 設置之前的 Stream 訂閱數據,例如設置中斷 Stream.periodic 周期展示次數;和尚粗略理解為 take 可以作為中斷訂閱,如果 take 設置次數大於 onDone 之前的訂閱數據次數,Stream 依舊獲取所有 onDone 之前的訂閱數據;

_streamFromPeriodic() {    Duration interval = Duration(seconds: 1);    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);    streamData.take(5).listen((event) {      print('Stream.periodic -> $event');    }).onDone(() {      print('Stream.periodic -> done 結束');    });  }  
_streamFromIterable() {    var data = [1, 2, '3.toString()', true, false, 6];    Stream.fromIterable(data)        .take(8)        .listen((event) => print('Stream.fromIterable -> $event'))        .onDone(() => print('Stream.fromIterable -> done 結束'));  }  
2.2 Stream<T> takeWhile(bool test(T element))

takeWhile 也可以實現上述相同效果,通過 test 返回一個 boolean 類型,如果為 false 則中斷訂閱;

_streamFromPeriodic() {    Duration interval = Duration(seconds: 1);    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);    streamData.takeWhile((element) {      print('Stream.periodic.takeWhile -> $element');      return element <= 5;    }).listen((event) {      print('Stream.periodic -> $event');    }).onDone(() {      print('Stream.periodic -> done 結束');    });  }  
2.3 Stream<T> where(bool test(T event))

where 用於在當前 Stream 中創建一個新的 Stream 用來丟棄不符合 test 的數據;和尚簡單理解為類似資料庫查詢一樣,僅過濾符合需求的數據流;且 where 可以設置多次;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 5;  }).where((event) {    print('Stream.periodic.where -> $event');    return event > 3;  }).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 結束');  });  
2.4 Stream<T> distinct([bool equals(T previous, T next)])

distinct 和尚理解為相鄰兩個數據濾重;

var data = [1, 2, '3.toString()', true, true, false, true, 6];  Stream.fromIterable(data)      .distinct()      .listen((event) => print('Stream.fromIterable -> $event'))      .onDone(() => print('Stream.fromIterable -> done 結束'));  
2.5 Stream<T> skip(int count)

skip 用於跳過符合條件的訂閱數據次數;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {        print('Stream.periodic.takeWhile -> $element');        return element <= 6;      }).where((event) {        print('Stream.periodic.where -> $event');        return event > 2;      })      .skip(2).listen((event) {        print('Stream.periodic -> $event');      }).onDone(() {        print('Stream.periodic -> done 結束');      });  
2.6 Stream<T> skipWhile(bool test(T element))

skipWhile 用於跳過在 where 符合條件下滿足設置 test 條件的訂閱數據;即當 testtrue 時跳過當前訂閱數據監聽;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 6;  }).where((event) {    print('Stream.periodic.where -> $event');    return event > 2;  }).skipWhile((element) {    print('Stream.periodic.skipWhile -> $element');    return element <= 4;  }).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 結束');  });  
2.7 Stream<S> map<S>(S convert(T event))

在當前 Stream 基礎上創建一個新的 Stream 並對當前 Stream 進行數據操作,onData 監聽到的是 map 變更後的新的數據流;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 6;  }).where((event) {    print('Stream.periodic.where -> $event');    return event > 2;  }).skipWhile((element) {    print('Stream.periodic.skipWhile -> $element');    return element <= 4;  }).map((event) {    print('Stream.periodic.map -> $event -> ${event * 100}');    return event * 100;  }).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 結束');  });  
2.8 Stream<S> expand<S>(Iterable<S> convert(T element))

在當前 Stream 基礎上創建新的 Stream 並將當前訂閱數據轉為新的訂閱數據組onData 監聽 數據組 中每個新的訂閱數據元素;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 6;  }).where((event) {    print('Stream.periodic.where -> $event');    return event > 2;  }).skipWhile((element) {    print('Stream.periodic.skipWhile -> $element');    return element <= 4;  }).expand((element) {    print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');    return [element, element * 10, element * 100];  }).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 結束');  });  
2.9 Future<int> get length

Stream 監聽訂閱事件結束後,符合 where 條件的數量;

_streamLength(index) async {    Duration interval = Duration(seconds: 1);    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);    streamData = streamData.takeWhile((element) {      print('Stream.periodic.takeWhile -> $element');      return element <= 6;    }).where((event) {      print('Stream.periodic.where -> $event');      return event > 2;    }).skipWhile((element) {      print('Stream.periodic.skipWhile -> $element');      return element <= 4;    });    switch (index) {      case 1:        var length = await streamData.length;        print('Stream.length -> $length');        break;      case 2:        var isEmpty = await streamData.isEmpty;        print('Stream.isEmpty -> $isEmpty');        break;      case 3:        var isBroadcast = await streamData.isBroadcast;        print('Stream.isBroadcast -> $isBroadcast');        break;      case 4:        var first = await streamData.first;        print('Stream.first -> $first');        break;      case 5:        var last = await streamData.last;        print('Stream.last -> $last');        break;    }  }  
2.10 Future<bool> get isEmpty

Stream 監聽訂閱事件結束後,統計是否符合 where 條件的訂閱數據是否為空;

_streamLength(2);  
2.11 Future<T> get first

獲取 Stream 符合條件的第一個訂閱數據;

_streamLength(4);  
2.12 Future<T> get last

獲取 Stream 符合條件的最後一個訂閱數據;

_streamLength(5);  
2.13 Future<List<T>> toList()

Stream 監聽結束之後,將訂閱數據存儲在 List 中,該操作為非同步操作;

_streamToList() async {    var data = [1, 2, '3.toString()', true, true, false, true, 6];    Stream stream = Stream.fromIterable(data).distinct();    List list = await stream.toList();    if (list != null) {      print('Stream.toList -> ${list}');      for (int i = 0; i < list.length; i++) {        print('Stream.toList -> ${i + 1} -> ${list[i]}');      }    }  }  
2.14 Future<Set<T>> toSet()

Stream 監聽結束之後,將訂閱數據存儲在 Set 中,Set 可以過濾重複數據;

_streamToSet() async {    var data = [1, 2, '3.toString()', true, true, false, true, 6];    Stream stream = Stream.fromIterable(data);    Set set = await stream.toSet();    if (set != null) {      print('Stream.toSet -> ${set}');    }  }  
2.15 Future forEach(void action(T element))

監聽 Stream 中訂閱數據,是對 listen 方式的一種監聽;

_streamForEach() {    var data = [1, 2, '3.toString()', true, true, false, true, 6];    Stream stream = Stream.fromIterable(data).distinct();    stream.forEach((element) => print('Stream.forEach -> $element'));  }