[Kotlin Tutorials 19] Kotlin Flows, SharedFlow and StateFlow in Android

Kotlin Flows

本文包含的內容:

  • Flow是什麼, 基本概念和用法.
  • Flow的不同類型, StateFlow和SharedFlow比較.
  • Flow在Android中的使用
    • 安全收集.
    • 操作符stateIn, shareIn的用法和區別.

本文被收錄在集合中: //github.com/mengdd/KotlinTutorials

Coroutines Flow Basics

Flow是什麼

Flow可以按順序發送多個值, 概念上是一個數據流, 發射的值必須是同一個類型.
Flow使用suspend方法來生產/消費值, 數據流可以做非同步計算.

幾個基本知識點:

  • 創建flow: 通過flow builders
  • Flow數據流通過emit()來發射元素.
  • 可以通過各種操作符對flow的數據進行處理. 注意中間的操作符都不會觸發flow的數據發送.
  • Flow默認是cold flow, 即需要通過被觀察才能激活, 最常用的操作符是collect().
  • Flow的CoroutineContext, 不指定的情況下是collect()CoroutineContext, 如果想要更改, 用flowOn
    改之前的.

關於Flow的基本用法, 19年底寫的這篇coroutines flow in Android可以溫故知新.

Flow的操作符

一個Flow操作符的可視化小網站: FlowMarbles.

Flow的不同類型

SharedFlow and StateFlow

應用程式里比較常用的類型是SharedFlow和StateFlow.
Android官方有一篇專門的文檔來介紹二者: StateFlow and SharedFlow
StateFlow繼承於SharedFlow, SharedFlow繼承於Flow.

基本關係如下:

kotlin-flows

  • Flow
    基類. Cold.
    Flow的兩大特性: Context preservation; Exception transparency.

  • SharedFlow
    繼承Flow, 是一種hot flow, 所有collectors共享它的值, 永不終止, 是一種廣播的方式.
    一個shared flow上的活躍collector被叫作subscriber.

在sharedFlow上的collect call永遠不會正常complete, 還有Flow.launchIn.
可以配置replay and buffer overflow strategy.

如果subscriber suspend了, sharedflow會suspend這個stream, buffer這個要發射的元素, 等待subscriber resume.
Because onBufferOverflow is set with BufferOverflow.SUSPEND, the flow will suspend until it can deliver the event to all subscribers.

默認參數:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

total buffer是: replay + extraBufferCapacity.
如果total buffer是0, 那麼onBufferOverflow只能是onBufferOverflow = BufferOverflow.SUSPEND.

關於reply和buffer, 這個文章
有詳細的解釋, 並且配有動圖.

  • StateFlow
    繼承SharedFlow, hot flow, 和是否有collector收集無關, 永不complete.

可以通過value屬性訪問當前值.
有conflated特性, 會跳過太快的更新, 永遠返回最新值.
Strong equality-based conflation: 會通過equals()來判斷值是否發生改變, 如果沒有改變, 則不會通知collector.
因為conflated的特性, StateFlow賦值的時候要注意使用不可變的值.

cold vs hot

cold stream 可以重複收集, 每次收集, 會對每一個收集者單獨開啟一次.
hot stream 永遠發射不同的值, 和是否有人收集無關, 永遠不會終止.

  • sharedIn
    可以把cold flow轉成hot的SharedFlow.
  • stateIn
    可以把cold flow轉成hot的StateFlow.

StateFlow vs SharedFlow

共性:

  • StateFlowSharedFlow永遠都不會停止. 不能指望它們的onCompletionCallback.

不同點:

  • StateFlow可以通過value屬性讀到最新的值, 但SharedFlow卻不行.
  • StateFlow是conflated: 如果新的值和舊的值一樣, 不會傳播.
  • SharedFlow需要合理設置buffer和replay策略.

互相轉換:
SharedFlow用了distinctUntilChanged以後變成StateFlow.

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

RxJava的等價替代:

  • PublishSubject -> SharedFlow.
  • BehaviorSubject -> StateFlow.

Use Flow in Android

發送事件(Event或Effects): SharedFlow

因為SharedFlow沒有conflated特性, 所以適合發送事件, 即便值變化得快也是每個都發送.

private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1
val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2

這裡用了asSharedFlow來創建一個ReadonlySharedFlow.

SharedFlow發射元素有兩個方法:

  • emit: suspend方法.
  • tryEmit: 非suspend方法.

因為tryEmit是非suspend的, 適用於有buffer的情況.

保存暴露UI狀態: StateFlow

StateFlow是一個state-holder, 可以通過value讀到當前狀態值.
一般會有一個MutableStateFlow類型的Backing property.

StateFlow是hot的, collect並不會觸發producer code.
當有新的consumer時, 新的consumer會接到上次的狀態和後續的狀態.

使用StateFlow時, 發射新元素只需要賦值:

mutableState.value = newState

注意這裡新值和舊的值要equals判斷不相等才能發射出去.

StateFlow vs LiveData

StateFlowLiveData很像.

StateFlowLiveData的相同點:

  • 永遠有一個值.
  • 只有一個值.
  • 支援多個觀察者.
  • 在訂閱的瞬間, replay最新的值.

有一點點不同:

  • StateFlow需要一個初始值.
  • LiveData會自動解綁, flow要達到相同效果, collect要在Lifecycle.repeatOnLifecycle里.

Flow的安全收集

關於收集Flow的方法, 主要還是關注一下生命周期的問題, 因為SharedFlow和StateFlow都是hot的.
在這個文章里有詳細的討論: A safer way to collect flows from Android UIs

在UI層收集的時候注意要用repeatOnLifecycle:

class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        //...
        // Start a coroutine in the lifecycle scope
        lifecycleScope.launch {
            // repeatOnLifecycle launches the block in a new coroutine every time the
            // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Trigger the flow and start listening for values.
                // Note that this happens when lifecycle is STARTED and stops
                // collecting when the lifecycle is STOPPED
                latestNewsViewModel.uiState.collect { uiState ->
                    // New value received
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

這個文章里有個擴展方法也挺好的:

class FlowObserver<T> (
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) {

    private var job: Job? = null

    init {
        lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver {
                source: LifecycleOwner, event: Lifecycle.Event ->
            when (event) {
                Lifecycle.Event.ON_START -> {
                    job = source.lifecycleScope.launch {
                        flow.collect { collector(it) }
                    }
                }
                Lifecycle.Event.ON_STOP -> {
                    job?.cancel()
                    job = null
                }
                else -> { }
            }
        })
    }
}


inline fun <reified T> Flow<T>.observeOnLifecycle(
    lifecycleOwner: LifecycleOwner,
    noinline collector: suspend (T) -> Unit
) = FlowObserver(lifecycleOwner, this, collector)

inline fun <reified T> Flow<T>.observeInLifecycle(
    lifecycleOwner: LifecycleOwner
) = FlowObserver(lifecycleOwner, this, {})

看了一下官方的repeatOnLifecycle其實大概也是這個意思:

public suspend fun Lifecycle.repeatOnLifecycle(
    state: Lifecycle.State,
    block: suspend CoroutineScope.() -> Unit
) {
    require(state !== Lifecycle.State.INITIALIZED) {
        "repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
    }

    if (currentState === Lifecycle.State.DESTROYED) {
        return
    }

    // This scope is required to preserve context before we move to Dispatchers.Main
    coroutineScope {
        withContext(Dispatchers.Main.immediate) {
            // Check the current state of the lifecycle as the previous check is not guaranteed
            // to be done on the main thread.
            if (currentState === Lifecycle.State.DESTROYED) return@withContext

            // Instance of the running repeating coroutine
            var launchedJob: Job? = null

            // Registered observer
            var observer: LifecycleEventObserver? = null
            try {
                // Suspend the coroutine until the lifecycle is destroyed or
                // the coroutine is cancelled
                suspendCancellableCoroutine<Unit> { cont ->
                    // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
                    // cancels when it falls below that state.
                    val startWorkEvent = Lifecycle.Event.upTo(state)
                    val cancelWorkEvent = Lifecycle.Event.downFrom(state)
                    val mutex = Mutex()
                    observer = LifecycleEventObserver { _, event ->
                        if (event == startWorkEvent) {
                            // Launch the repeating work preserving the calling context
                            launchedJob = [email protected] {
                                // Mutex makes invocations run serially,
                                // coroutineScope ensures all child coroutines finish
                                mutex.withLock {
                                    coroutineScope {
                                        block()
                                    }
                                }
                            }
                            return@LifecycleEventObserver
                        }
                        if (event == cancelWorkEvent) {
                            launchedJob?.cancel()
                            launchedJob = null
                        }
                        if (event == Lifecycle.Event.ON_DESTROY) {
                            cont.resume(Unit)
                        }
                    }
                    [email protected](observer as LifecycleEventObserver)
                }
            } finally {
                launchedJob?.cancel()
                observer?.let {
                    [email protected](it)
                }
            }
        }
    }
}

既然官方已經推出了, 我們就用官方的repeatOnLifecycle方法吧.

shareInstateIn

前面提過這兩個操作符是用來做flow轉換的:

  • sharedIn
    可以把cold flow轉成hot的SharedFlow.
  • stateIn
    可以把cold flow轉成hot的StateFlow.

shareIn可以保證只有一個數據源被創造, 並且被所有collectors收集.
比如:

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())
}

WhileSubscribed這個策略是說, 當無人觀測時, 上游的flow就被取消.

實際使用時可以用WhileSubscribed(5000), 讓上游的flow即便在無人觀測的情況下, 也能繼續保持5秒.
這樣可以在某些情況(比如旋轉螢幕)時避免重建上遊資源, 適用於上遊資源創建起來很expensive的情況.

如果我們的需求是, 永遠保持一個最新的cache值.


class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)
}

Flow.stateIn將會快取最後一個值, 並且有新的collector時, 將這個最新值傳給它.

shareIn, stateIn使用注意事項

永遠不要在方法裡面調用shareInstateIn, 因為方法每次被調用, 它們都會創建新的流.
這些流沒有被複用, 會存在記憶體裡面, 直到scope被取消或者沒有引用時被GC.

推薦的使用方式是在property上用:

class UserRepository(
    private val userLocalDataSource: UserLocalDataSource,
    private val externalScope: CoroutineScope
) {
    // DO NOT USE shareIn or stateIn in a function like this.
    // It creates a new SharedFlow/StateFlow per invocation which is not reused!
    fun getUser(): Flow<User> =
        userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

    // DO USE shareIn or stateIn in a property
    val user: Flow<User> = 
        userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
}

StateFlow使用總結

從ViewModel暴露數據到UI, 用StateFlow的兩種方式:

  1. 暴露一個StateFlow屬性, 用WhileSubscribed加上一個timeout.
class MyViewModel(...) : ViewModel() {
    val result = userId.mapLatest { newUserId ->
        repository.observeItem(newUserId)
    }.stateIn(
        scope = viewModelScope, 
        started = WhileSubscribed(5000), 
        initialValue = Result.Loading
    )
}
  1. repeatOnLifecycle收集.
onCreateView(...) {
    viewLifecycleOwner.lifecycleScope.launch {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
            myViewModel.myUiState.collect { ... }
        }
    }
}

其他的組合都會保持上游的活躍, 浪費資源:

  • WhileSubscribed暴露屬性, 在lifecycleScope.launch/launchWhenX里收集.
  • 通過Lazily/Eagerly暴露, 用repeatOnLifecycle收集.

References