Kotlin Coroutines Flow 系列(四) 執行緒操作
- 2020 年 4 月 9 日
- 筆記
七. Flow 執行緒操作
7.1 更為簡化的執行緒切換
相對於 RxJava 多執行緒的學習曲線,Flow 對執行緒的切換友好地多。
在之前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾經介紹過 Flow 的切換執行緒,以及 flowOn 操作符。
Flow 只需使用 flowOn 操作符,而不必像 RxJava 需要去深入理解 observeOn、subscribeOn 之間的區別。
7.2 flowOn VS RxJava 的 observeOn
RxJava 的 observeOn 操作符,接收一個 Scheduler 參數,用來指定下游操作運行在特定的執行緒調度器 Scheduler 上。
Flow 的 flowOn 操作符,接收一個 CoroutineContext 參數,影響的是上游的操作。
例如:
fun main() = runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .collect { println("${Thread.currentThread().name}: $it") } }
flow builder 和 map 操作符都會受到flowOn
的影響,並使用 Dispatchers.io 執行緒池。
再例如:
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher() fun main() = runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.map { it * it }.flowOn(Dispatchers.IO) .map { it+1 } .flowOn(customerDispatcher) .collect { println("${Thread.currentThread().name}: $it") } }
flow builder 和兩個 map 操作符都會受到兩個flowOn
的影響,其中 flow builder 和第一個 map 操作符跟上面的例子一樣,第二個 map 操作符會切換到指定的 customerDispatcher 執行緒池。
7.3 buffer 實現並發操作
在 Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介紹 buffer 操作符對應 RxJava Backpressure 中的 BUFFER 策略。
事實上 buffer 操作符也可以並發地執行任務,它是除了使用 flowOn 操作符之外的另一種方式,只是不能顯示地指定 Dispatchers。
例如:
fun main() = runBlocking { val time = measureTimeMillis { flow { for (i in 1..5) { delay(100) emit(i) } } .buffer() .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
執行結果:
1 2 3 4 5 Collected in 1676 ms
在上述例子中,所有的 delay 所花費的時間是2000ms。然而通過 buffer 操作符並發
地執行 emit,再順序地執行 collect 函數後,所花費的時間在 1700ms 左右。
如果去掉 buffer 操作符。
fun main() = runBlocking { val time = measureTimeMillis { flow { for (i in 1..5) { delay(100) emit(i) } } .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
執行結果:
1 2 3 4 5 Collected in 2039 ms
所花費的時間比剛才多了300多ms。
7.4 並行操作
在講解並行操作之前,先來了解一下並發和並行的區別。
並發(concurrency):是指一個處理器同時處理多個任務。 並行(parallelism):是多個處理器或者是多核的處理器同時處理多個不同的任務。並行是同時發生的多個並發事件,具有並發的含義,而並發則不一定是並行。
RxJava 可以藉助 flatMap 操作符實現並行,亦可以使用 ParallelFlowable 類實現並行操作。
下面,以 flatMap 操作符為例實現 RxJava 的並行:
Observable.range(1,100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<String>() { @Override public void accept(String str) throws Exception { System.out.println(str); } });
Flow 也有相應的操作符 flatMapMerge 可以實現並行。
fun main() = runBlocking { val result = arrayListOf<Int>() for (index in 1..100){ result.add(index) } result.asFlow() .flatMapMerge { flow { emit(it) } .flowOn(Dispatchers.IO) } .collect { println("$it") } }
總體而言,Flow 相比於 RxJava 更加簡潔一些。