[Kotlin Tutorials 19] Kotlin Flows, SharedFlow and StateFlow in Android
Kotlin Flows
本文包含的內容:
- Flow是什麼, 基本概念和用法.
- Flow的不同類型, StateFlow和SharedFlow比較.
- Flow在Android中的使用
- 安全收集.
- 操作符
stateIn
,shareIn
的用法和區別.
本文被收錄在集合中: //github.com/mengdd/KotlinTutorials
Coroutines Flow Basics
Flow是什麼
Flow可以按順序發送多個值, 概念上是一個數據流, 發射的值必須是同一個類型.
Flow使用suspend方法來生產/消費值, 數據流可以做異步計算.
幾個基本知識點:
- 創建flow: 通過flow builders
- Flow數據流通過
emit()
來發射元素. - 可以通過各種操作符對flow的數據進行處理. 注意中間的操作符都不會觸發flow的數據發送.
- Flow默認是cold flow, 即需要通過被觀察才能激活, 最常用的操作符是
collect()
. - Flow的
CoroutineContext
, 不指定的情況下是collect()
的CoroutineContext
, 如果想要更改, 用flowOn
改之前的.
關於Flow的基本用法, 19年底寫的這篇coroutines flow in Android可以溫故知新.
Flow的操作符
一個Flow操作符的可視化小網站: FlowMarbles.
Flow的不同類型
SharedFlow and StateFlow
應用程序里比較常用的類型是SharedFlow和StateFlow.
Android官方有一篇專門的文檔來介紹二者: StateFlow and SharedFlow
StateFlow繼承於SharedFlow, SharedFlow繼承於Flow.
基本關係如下:
-
Flow
基類. Cold.
Flow的兩大特性: Context preservation; Exception transparency. -
SharedFlow
繼承Flow, 是一種hot flow, 所有collectors共享它的值, 永不終止, 是一種廣播的方式.
一個shared flow上的活躍collector被叫作subscriber.
在sharedFlow上的collect call永遠不會正常complete, 還有Flow.launchIn.
可以配置replay and buffer overflow strategy.
如果subscriber suspend了, sharedflow會suspend這個stream, buffer這個要發射的元素, 等待subscriber resume.
Because onBufferOverflow is set with BufferOverflow.SUSPEND
, the flow will suspend until it can deliver the event to all subscribers.
默認參數:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
total buffer是: replay + extraBufferCapacity
.
如果total buffer是0, 那麼onBufferOverflow只能是onBufferOverflow = BufferOverflow.SUSPEND
.
關於reply和buffer, 這個文章
有詳細的解釋, 並且配有動圖.
- StateFlow
繼承SharedFlow, hot flow, 和是否有collector收集無關, 永不complete.
可以通過value
屬性訪問當前值.
有conflated特性, 會跳過太快的更新, 永遠返回最新值.
Strong equality-based conflation: 會通過equals()
來判斷值是否發生改變, 如果沒有改變, 則不會通知collector.
因為conflated的特性, StateFlow賦值的時候要注意使用不可變的值.
cold vs hot
cold stream 可以重複收集, 每次收集, 會對每一個收集者單獨開啟一次.
hot stream 永遠發射不同的值, 和是否有人收集無關, 永遠不會終止.
StateFlow vs SharedFlow
共性:
StateFlow
和SharedFlow
永遠都不會停止. 不能指望它們的onCompletionCallback
.
不同點:
StateFlow
可以通過value
屬性讀到最新的值, 但SharedFlow
卻不行.StateFlow
是conflated: 如果新的值和舊的值一樣, 不會傳播.SharedFlow
需要合理設置buffer和replay策略.
互相轉換:
SharedFlow
用了distinctUntilChanged
以後變成StateFlow
.
// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
RxJava的等價替代:
PublishSubject
->SharedFlow
.BehaviorSubject
->StateFlow
.
Use Flow in Android
發送事件(Event或Effects): SharedFlow
因為SharedFlow沒有conflated特性, 所以適合發送事件, 即便值變化得快也是每個都發送.
private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1
val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2
這裡用了asSharedFlow
來創建一個ReadonlySharedFlow
.
SharedFlow發射元素有兩個方法:
emit
: suspend方法.tryEmit
: 非suspend方法.
因為tryEmit
是非suspend的, 適用於有buffer的情況.
保存暴露UI狀態: StateFlow
StateFlow
是一個state-holder, 可以通過value
讀到當前狀態值.
一般會有一個MutableStateFlow
類型的Backing property.
StateFlow
是hot的, collect並不會觸發producer code.
當有新的consumer時, 新的consumer會接到上次的狀態和後續的狀態.
使用StateFlow時, 發射新元素只需要賦值:
mutableState.value = newState
注意這裡新值和舊的值要equals
判斷不相等才能發射出去.
StateFlow vs LiveData
StateFlow
和LiveData
很像.
StateFlow
和LiveData
的相同點:
- 永遠有一個值.
- 只有一個值.
- 支持多個觀察者.
- 在訂閱的瞬間, replay最新的值.
有一點點不同:
StateFlow
需要一個初始值.LiveData
會自動解綁, flow要達到相同效果, collect要在Lifecycle.repeatOnLifecycle
里.
Flow的安全收集
關於收集Flow的方法, 主要還是關注一下生命周期的問題, 因為SharedFlow和StateFlow都是hot的.
在這個文章里有詳細的討論: A safer way to collect flows from Android UIs
在UI層收集的時候注意要用repeatOnLifecycle
:
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
//...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
這個文章里有個擴展方法也挺好的:
class FlowObserver<T> (
lifecycleOwner: LifecycleOwner,
private val flow: Flow<T>,
private val collector: suspend (T) -> Unit
) {
private var job: Job? = null
init {
lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver {
source: LifecycleOwner, event: Lifecycle.Event ->
when (event) {
Lifecycle.Event.ON_START -> {
job = source.lifecycleScope.launch {
flow.collect { collector(it) }
}
}
Lifecycle.Event.ON_STOP -> {
job?.cancel()
job = null
}
else -> { }
}
})
}
}
inline fun <reified T> Flow<T>.observeOnLifecycle(
lifecycleOwner: LifecycleOwner,
noinline collector: suspend (T) -> Unit
) = FlowObserver(lifecycleOwner, this, collector)
inline fun <reified T> Flow<T>.observeInLifecycle(
lifecycleOwner: LifecycleOwner
) = FlowObserver(lifecycleOwner, this, {})
看了一下官方的repeatOnLifecycle
其實大概也是這個意思:
public suspend fun Lifecycle.repeatOnLifecycle(
state: Lifecycle.State,
block: suspend CoroutineScope.() -> Unit
) {
require(state !== Lifecycle.State.INITIALIZED) {
"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
}
if (currentState === Lifecycle.State.DESTROYED) {
return
}
// This scope is required to preserve context before we move to Dispatchers.Main
coroutineScope {
withContext(Dispatchers.Main.immediate) {
// Check the current state of the lifecycle as the previous check is not guaranteed
// to be done on the main thread.
if (currentState === Lifecycle.State.DESTROYED) return@withContext
// Instance of the running repeating coroutine
var launchedJob: Job? = null
// Registered observer
var observer: LifecycleEventObserver? = null
try {
// Suspend the coroutine until the lifecycle is destroyed or
// the coroutine is cancelled
suspendCancellableCoroutine<Unit> { cont ->
// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
// cancels when it falls below that state.
val startWorkEvent = Lifecycle.Event.upTo(state)
val cancelWorkEvent = Lifecycle.Event.downFrom(state)
val mutex = Mutex()
observer = LifecycleEventObserver { _, event ->
if (event == startWorkEvent) {
// Launch the repeating work preserving the calling context
launchedJob = [email protected] {
// Mutex makes invocations run serially,
// coroutineScope ensures all child coroutines finish
mutex.withLock {
coroutineScope {
block()
}
}
}
return@LifecycleEventObserver
}
if (event == cancelWorkEvent) {
launchedJob?.cancel()
launchedJob = null
}
if (event == Lifecycle.Event.ON_DESTROY) {
cont.resume(Unit)
}
}
[email protected](observer as LifecycleEventObserver)
}
} finally {
launchedJob?.cancel()
observer?.let {
[email protected](it)
}
}
}
}
}
既然官方已經推出了, 我們就用官方的repeatOnLifecycle
方法吧.
shareIn
和stateIn
前面提過這兩個操作符是用來做flow轉換的:
shareIn
可以保證只有一個數據源被創造, 並且被所有collectors收集.
比如:
class LocationRepository(
private val locationDataSource: LocationDataSource,
private val externalScope: CoroutineScope
) {
val locations: Flow<Location> =
locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())
}
WhileSubscribed
這個策略是說, 當無人觀測時, 上游的flow就被取消.
實際使用時可以用WhileSubscribed(5000)
, 讓上游的flow即便在無人觀測的情況下, 也能繼續保持5秒.
這樣可以在某些情況(比如旋轉屏幕)時避免重建上遊資源, 適用於上遊資源創建起來很expensive的情況.
如果我們的需求是, 永遠保持一個最新的cache值.
class LocationRepository(
private val locationDataSource: LocationDataSource,
private val externalScope: CoroutineScope
) {
val locations: Flow<Location> =
locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)
}
Flow.stateIn
將會緩存最後一個值, 並且有新的collector時, 將這個最新值傳給它.
shareIn
, stateIn
使用注意事項
永遠不要在方法裏面調用shareIn
和stateIn
, 因為方法每次被調用, 它們都會創建新的流.
這些流沒有被複用, 會存在內存裏面, 直到scope被取消或者沒有引用時被GC.
推薦的使用方式是在property上用:
class UserRepository(
private val userLocalDataSource: UserLocalDataSource,
private val externalScope: CoroutineScope
) {
// DO NOT USE shareIn or stateIn in a function like this.
// It creates a new SharedFlow/StateFlow per invocation which is not reused!
fun getUser(): Flow<User> =
userLocalDataSource.getUser()
.shareIn(externalScope, WhileSubscribed())
// DO USE shareIn or stateIn in a property
val user: Flow<User> =
userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
}
StateFlow使用總結
從ViewModel暴露數據到UI, 用StateFlow
的兩種方式:
- 暴露一個StateFlow屬性, 用
WhileSubscribed
加上一個timeout.
class MyViewModel(...) : ViewModel() {
val result = userId.mapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}
- 用
repeatOnLifecycle
收集.
onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
myViewModel.myUiState.collect { ... }
}
}
}
其他的組合都會保持上游的活躍, 浪費資源:
- 用
WhileSubscribed
暴露屬性, 在lifecycleScope.launch/launchWhenX
里收集. - 通過
Lazily/Eagerly
暴露, 用repeatOnLifecycle
收集.
References
- Kotlin flows on Android
- StateFlow and SharedFlow
- A safer way to collect flows from Android UIs
- Things to know about Flow』s shareIn and stateIn operators
- Shared flows, broadcast channels
- Kotlin SharedFlow or: How I learned to stop using RxJava and love the Flow
- Migrating from LiveData to Kotlin』s Flow
- Substituting Android』s LiveData: StateFlow or SharedFlow?
- Learning State & Shared Flows with Unit Tests
- Reactive Streams on Kotlin: SharedFlow and StateFlow
- Reading Coroutine official guide thoroughly — Part 0