Go 語言並發編程系列(十四)—— 通過 context 包實現多協程之間的協作
- 2019 年 10 月 11 日
- 筆記
上篇教程學院君介紹了如何通過 sync.WaitGroup
類型優化通道對多協程協調的處理,但是現在有一個問題,就是我們在啟動子協程之前都已經明確知道子協程的總量,如果不知道的話,該怎麼實現呢?
一種解決方案是通過 sync.WaitGroup
分批啟動子協程,具體實現程式碼如下:
package main import ( "fmt" "sync" ) func addNum(a, b int, deferFunc func()) { defer func() { deferFunc() }() c := a + b fmt.Printf("%d + %d = %dn", a, b, c) } func main() { total := 10 step := 2 fmt.Println("啟動子協程...") var wg sync.WaitGroup for i := 0; i < total; i = i + step { wg.Add(step) for j := 0; j < step; j++ { go addNum(i + j, 1, wg.Done) } wg.Wait() } fmt.Println("所有子協程執行完畢.") }
這裡我們採用分批次啟動子協程的方法,每次通過 wg.Add()
函數設置當前批次啟動的子協程數量,另外需要注意的是 wg.Wait()
函數最好和 wg.Add()
函數配對使用,否則可能會引起 panic。
除此之外,我們還可以通過另一種工具實現類似需求,這就是我們今天要介紹的 context 包,這個包為我們提供了以下方法和類型:

我們可以先通過 withXXX
方法返回一個從父 Context 拷貝的新的可撤銷子 Context
對象和對應撤銷函數 CancelFunc
,CancelFunc
是一個函數類型,調用它時會撤銷對應的子 Context 對象,當滿足某種條件時,我們可以通過調用該函數結束所有子協程的運行,主協程在接收到訊號後可以繼續往後執行。
這麼說有點迷糊,下面我們結合示例程式碼來解釋這個包的具體使用:
package main import ( "context" "fmt" "sync/atomic" "time" ) func AddNum(a *int32, b int, deferFunc func()) { defer func() { deferFunc() }() for i := 0; ; i++ { curNum := atomic.LoadInt32(a) newNum := curNum + 1 time.Sleep(time.Millisecond * 200) if atomic.CompareAndSwapInt32(a, curNum, newNum) { fmt.Printf("number當前值: %d [%d-%d]n", *a, b, i) break } else { //fmt.Printf("The CAS operation failed. [%d-%d]n", b, i) } } } func main() { total := 10 var num int32 fmt.Printf("number初始值: %dn", num) fmt.Println("啟動子協程...") ctx, cancelFunc := context.WithCancel(context.Background()) for i := 0; i < total; i++ { go AddNum(&num, i, func() { if atomic.LoadInt32(&num) == int32(total) { cancelFunc() } }) } <- ctx.Done() fmt.Println("所有子協程執行完畢.") }
在這段程式碼中,我們先通過 context.WithCancel
方法返回一個新的 cxt
和 cancelFunc
,並且通過 context.Background()
方法傳入父 Context,該 Context 沒有值,永遠不會取消,可以看作是所有 Context 的根節點,比如這裡的 cxt
就是從父 Context 拷貝過來的可撤銷的子 Context。然後我們在一個 for
循環中依次啟動子協程,並且只有在 atomic.LoadInt32(&num) == int32(total)
(所有子協程執行完畢)時調用 cancelFunc()
方法撤銷對應子 Context 對象 cxt
,這樣,處於阻塞狀態的 cxt.Done()
對應通道被關閉,我們可以接收到通道數據然後退出主程式。
註:
cxt.Done()
方法返回一個通道,該通道會在調用cancelFunc
函數時關閉,或者在父 context 撤銷時也會被關閉。
WithDeadline
和 WithTimeout
分別比 WithCancel
多了一個 deadline
和 timeout
時間參數,表示子 Context 存活的最長時間,如果超過了該時間,會自動撤銷對應的子 Context。相應的,在調用 <-cxt.Done()
等待子協程執行結束時,如果沒有調用 cancelFunc
函數的話它們會等待過期時間到達自動關閉,不過我們通常還是會主動調用 cancelFunc
函數以便更好的控制程式運行。
此外,context 包還提供了一個 TODO
方法,該方法用於在不知道使用哪種 Context 時使用,不過目前基本用不到,還有一個 withValue
方法用於返回包含上下文資訊的 Context 對象,當我們需要通過 Context 傳遞上下文數據時可以使用該方法返回 Context:
ctx, cancelFunc := context.WithTimeout(context.Background(), 10 * time.Second) valueCtx := context.WithValue(ctx, "key", "value") defer cancelFunc() for i := 0; i < total; i++ { go AddNum(&num, i, func() { if atomic.LoadInt32(&num) == int32(total) { fmt.Println("key:", valueCtx.Value("key")) cancelFunc() } }) } <- ctx.Done()