Go 語言並發編程系列(十四)—— 通過 context 包實現多協程之間的協作

  • 2019 年 10 月 11 日
  • 筆記

上篇教程學院君介紹了如何通過 sync.WaitGroup 類型優化通道對多協程協調的處理,但是現在有一個問題,就是我們在啟動子協程之前都已經明確知道子協程的總量,如果不知道的話,該怎麼實現呢?

一種解決方案是通過 sync.WaitGroup 分批啟動子協程,具體實現程式碼如下:

package main    import (      "fmt"      "sync"  )    func addNum(a, b int, deferFunc func())  {      defer func() {          deferFunc()      }()      c := a + b      fmt.Printf("%d + %d = %dn", a, b, c)  }    func main() {      total := 10      step := 2      fmt.Println("啟動子協程...")      var wg sync.WaitGroup      for i := 0; i < total; i = i + step {          wg.Add(step)          for j := 0; j < step; j++ {              go addNum(i + j, 1, wg.Done)          }          wg.Wait()      }      fmt.Println("所有子協程執行完畢.")  }

這裡我們採用分批次啟動子協程的方法,每次通過 wg.Add() 函數設置當前批次啟動的子協程數量,另外需要注意的是 wg.Wait() 函數最好和 wg.Add() 函數配對使用,否則可能會引起 panic。

除此之外,我們還可以通過另一種工具實現類似需求,這就是我們今天要介紹的 context 包,這個包為我們提供了以下方法和類型:

我們可以先通過 withXXX 方法返回一個從父 Context 拷貝的新的可撤銷子 Context 對象和對應撤銷函數 CancelFuncCancelFunc 是一個函數類型,調用它時會撤銷對應的子 Context 對象,當滿足某種條件時,我們可以通過調用該函數結束所有子協程的運行,主協程在接收到訊號後可以繼續往後執行。

這麼說有點迷糊,下面我們結合示例程式碼來解釋這個包的具體使用:

package main    import (      "context"      "fmt"      "sync/atomic"      "time"  )    func AddNum(a *int32, b int, deferFunc func())  {      defer func() {          deferFunc()      }()      for i := 0; ; i++ {          curNum := atomic.LoadInt32(a)          newNum := curNum + 1          time.Sleep(time.Millisecond * 200)          if atomic.CompareAndSwapInt32(a, curNum, newNum) {              fmt.Printf("number當前值: %d [%d-%d]n", *a, b, i)              break          } else {              //fmt.Printf("The CAS operation failed. [%d-%d]n", b, i)          }      }  }    func main() {      total := 10      var num int32      fmt.Printf("number初始值: %dn", num)      fmt.Println("啟動子協程...")      ctx, cancelFunc := context.WithCancel(context.Background())      for i := 0; i < total; i++ {          go AddNum(&num, i, func() {              if atomic.LoadInt32(&num) == int32(total) {                  cancelFunc()              }          })      }      <- ctx.Done()      fmt.Println("所有子協程執行完畢.")  }

在這段程式碼中,我們先通過 context.WithCancel 方法返回一個新的 cxtcancelFunc,並且通過 context.Background() 方法傳入父 Context,該 Context 沒有值,永遠不會取消,可以看作是所有 Context 的根節點,比如這裡的 cxt 就是從父 Context 拷貝過來的可撤銷的子 Context。然後我們在一個 for 循環中依次啟動子協程,並且只有在 atomic.LoadInt32(&num) == int32(total)(所有子協程執行完畢)時調用 cancelFunc() 方法撤銷對應子 Context 對象 cxt,這樣,處於阻塞狀態的 cxt.Done() 對應通道被關閉,我們可以接收到通道數據然後退出主程式。

註:cxt.Done() 方法返回一個通道,該通道會在調用 cancelFunc 函數時關閉,或者在父 context 撤銷時也會被關閉。

WithDeadlineWithTimeout 分別比 WithCancel 多了一個 deadlinetimeout 時間參數,表示子 Context 存活的最長時間,如果超過了該時間,會自動撤銷對應的子 Context。相應的,在調用 <-cxt.Done() 等待子協程執行結束時,如果沒有調用 cancelFunc 函數的話它們會等待過期時間到達自動關閉,不過我們通常還是會主動調用 cancelFunc 函數以便更好的控制程式運行。

此外,context 包還提供了一個 TODO 方法,該方法用於在不知道使用哪種 Context 時使用,不過目前基本用不到,還有一個 withValue 方法用於返回包含上下文資訊的 Context 對象,當我們需要通過 Context 傳遞上下文數據時可以使用該方法返回 Context:

ctx, cancelFunc := context.WithTimeout(context.Background(), 10 * time.Second)  valueCtx := context.WithValue(ctx, "key", "value")  defer cancelFunc()  for i := 0; i < total; i++ {      go AddNum(&num, i, func() {          if atomic.LoadInt32(&num) == int32(total) {              fmt.Println("key:", valueCtx.Value("key"))              cancelFunc()          }      })  }  <- ctx.Done()