【譯】kotlin 協程官方文檔(8)-共享可變狀態和並發性(Shared mutable state and concurrency)
- 2020 年 4 月 1 日
- 筆記
最近一直在了解關於kotlin協程的知識,那最好的學習資料自然是官方提供的學習文檔了,看了看後我就萌生了翻譯官方文檔的想法。前後花了要接近一個月時間,一共九篇文章,在這裡也分享出來,希望對讀者有所幫助。個人知識所限,有些翻譯得不是太順暢,也希望讀者能提出意見 協程官方文檔:coroutines-guide 協程官方文檔中文翻譯:coroutines-cn-guide 協程官方文檔中文譯者:leavesC
[TOC]
可以使用多執行緒調度器(如 Dispatchers.Default)並發執行協程,它呈現了所有常見的並發問題。主要問題是對共享可變狀態的同步訪問。在協程作用域中解決這個問題的一些方法類似於多執行緒世界中的方法,但有一些其它方法是獨有的
一、問題(The problem)
讓我們啟動一百個協程,都做同樣的操作一千次。我們還將計算它們的完成時間,以便進一步比較:
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
我們從一個非常簡單的操作開始,該操作使用多執行緒調度器 Dispatchers.Default,並增加一個共享的可變變數
import kotlinx.coroutines.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } println("Counter = $counter") } //sampleEnd
最後會列印出什麼呢?不太可能列印出 「Counter=100000」,因為100個協程從多個執行緒並發地遞增 counter 而不進行任何同步。
二、Volatiles 是沒有作用的(Volatiles are of no help)
有一種常見的誤解是:將變數標記為 volatile 可以解決並發問題。讓我們試試:
import kotlinx.coroutines.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart @Volatile // in Kotlin `volatile` is an annotation var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } println("Counter = $counter") } //sampleEnd
這段程式碼運行得比較慢,但是我們在最後仍然沒有得到「Counter=100000」,因為 volatile 變數保證了可線性化(這是「atomic」的一個技術術語)對相應變數的讀寫,但不提供更大行為的原子性(在我們的例子中指遞增操作)
三、執行緒安全的數據結構(Thread-safe data structures)
對執行緒和協程都有效的一個解決方案是使用執行緒安全的(也稱為同步、可線性化或原子)數據結構,該結構為需要在共享狀態上執行的相應操作提供所有必要的同步保障。對於一個簡單的計數器,我們可以使用 AtomicInteger 類,該類具有保證原子性的 incrementAndGet 方法
import kotlinx.coroutines.* import java.util.concurrent.atomic.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart var counter = AtomicInteger() fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter.incrementAndGet() } } println("Counter = $counter") } //sampleEnd
這是解決這個特殊問題的最快方法。它適用於普通計數器、集合、隊列和其他標準數據結構及其基本操作。但是,它不容易擴展到複雜的狀態或沒有實現好了的執行緒安全的複雜操作
四、以細粒度限制執行緒(Thread confinement fine-grained)
執行緒限制是解決共享可變狀態問題的一種方法,其中對特定共享狀態的所有訪問都限制在一個執行緒內。它通常用於 UI 應用程式,其中所有的 UI 狀態都限制在「單個事件分派」或「應用程式執行緒」中。通過使用單執行緒上下文,可以很容易地使用協程來實現上述的計數器
import kotlinx.coroutines.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart val counterContext = newSingleThreadContext("CounterContext") var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // confine each increment to a single-threaded context withContext(counterContext) { counter++ } } } println("Counter = $counter") } //sampleEnd
這段程式碼運行得非常緩慢,因為它執行細粒度的執行緒限制。每個單獨的增值操作都使用 withContext(counterContext) 從多執行緒 Dispatchers.Default 上下文切換到單執行緒上下文
五、以粗粒度限制執行緒(Thread confinement coarse-grained)
在實踐中,執行緒限制是在比較大的範圍內執行的,例如,更新狀態的邏輯的範圍被限制在單個執行緒中。下面的示例就是這樣做的,首先在單執行緒上下文中運行每個協程
import kotlinx.coroutines.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart val counterContext = newSingleThreadContext("CounterContext") var counter = 0 fun main() = runBlocking { // confine everything to a single-threaded context withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter") } //sampleEnd
現在這段程式碼的運行速度會快得多,併產生了正確的結果
六、互斥(Mutual exclusion)
互斥問題的解決方案是保護共享狀態的所有修改操作,其中的關鍵程式碼永遠不會同時執行。在一個阻塞的世界中,通常會使用 synchronized
或 ReentrantLock
。協程的替換方案稱為互斥(Mutex)。它具有 lock
和 unlock
函數以劃定一個關鍵位置。關鍵的區別在於 Mutex.lock()
是一個掛起函數。它不會阻塞執行緒
還有一個擴展函數 withLock 可以方便地來實現 mutex.lock(); try {...} finally { mutex.unlock() }
import kotlinx.coroutines.* import kotlinx.coroutines.sync.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } //sampleStart val mutex = Mutex() var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // protect each increment with lock mutex.withLock { counter++ } } } println("Counter = $counter") } //sampleEnd
本例中的鎖是細粒度的,因此它也付出了某些代價(消耗)。但是,在某些情況下這是一個很好的選擇,比如你必須定期修改某些共享狀態,但不具備修改共享狀態所需的原生執行緒
七、Actors
actor 是一個實體,由一個協程、被限制並封裝到這個協程中的狀態以及一個與其它協程通訊的通道組成。簡單的 actor 可以寫成函數,但具有複雜狀態的 actor 更適合類
有一個 actor 協程構造器,它可以方便地將 actor 的 mailbox channel 合併到其接收的消息的作用域中,並將 send channel 合併到生成的 job 對象中,以便可以將對 actor 的單個引用作為其句柄引有
使用 actor 的第一步是定義一類 actor 將要處理的消息。kotlin 的密封類非常適合這個目的。在 CounterMsg 密封類中,我們用 IncCounter 消息來定義遞增計數器,用 GetCounter 消息來獲取其值,後者需要返回值。為此,這裡使用 CompletableDeferred communication primitive,它表示將來已知(通訊)的單個值
// Message types for counterActor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
然後,我們定義一個函數,該函數使用 actor 協程構造器來啟動 actor:
// This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
程式碼很簡單:
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.system.* suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } // Message types for counterActor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply // This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } } //sampleStart fun main() = runBlocking<Unit> { val counter = counterActor() // create the actor withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // shutdown the actor } //sampleEnd
在什麼上下文中執行 actor 本身並不重要(為了正確)。actor 是一個協程,並且協程是按順序執行的,因此將狀態限制到特定的協程可以解決共享可變狀態的問題。實際上,actors 可以修改自己的私有狀態,但只能通過消息相互影響(避免需要任何鎖)
actor 比使用鎖更為有效,因為在這種情況下,它總是有工作要做,根本不需要切換到不同的上下文
注意,actor 協程構造器是一個雙重的 product 協程構造器 。actor 與它接收消息的通道相關聯,而 producer 與向其發送元素的通道相關聯