Go 語言並發編程系列(十一)—— sync 包系列:條件變數
- 2019 年 10 月 4 日
- 筆記
簡介 sync 包還提供了一個條件變數類型 sync.Cond
,它可以和互斥鎖或讀寫鎖(以下統稱互斥鎖)組合使用,用來協調想要訪問共享資源的執行緒。
不過,與互斥鎖不同,條件變數 sync.Cond
的主要作用並不是保證在同一時刻僅有一個執行緒訪問某一個共享資源,而是在對應的共享資源狀態發送變化時,通知其它因此而阻塞的執行緒。條件變數總是和互斥鎖組合使用,互斥鎖為共享資源的訪問提供互斥支援,而條件變數可以就共享資源的狀態變化向相關執行緒發出通知,重在「協調」。
下面,我們來看看如何使用條件變數 sync.Cond
。
初始化
sync.Cond
是一個結構體:
type Cond struct { noCopy noCopy // L is held while observing or changing the condition L Locker notify notifyList checker copyChecker }
提供了三個方法:
// 等待通知 func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() } // 單發通知 func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // 廣播通知 func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
我們可以通過 sync.NewCond
返回對應的條件變數實例,初始化的時候需要傳入互斥鎖,該互斥鎖實例會賦值給 sync.Cond
的 L
屬性:
locker := &sync.Mutex{} cond := sync.NewCond(locker)
sync.Cond
主要實現一個條件變數,假設 goroutine A 執行前需要等待另外一個 goroutine B 的通知,那麼處於等待狀態的 goroutine A 會保存在一個通知列表,也就是說需要某種變數狀態的 goroutine A 將會等待(Wait)在那裡,當某個時刻變數狀態改變時,負責通知的 goroutine B 會通過對條件變數通知的方式(Broadcast/Signal)來通知處於等待條件變數的 goroutine A,這樣就可以在共享記憶體中實現類似「消息通知」的同步機制。
使用示例
下面來看一個具體的示例。假設我們有一個讀取器和一個寫入器,讀取器必須依賴寫入器對緩衝區進行數據寫入後,才可以從緩衝區中讀取數據,寫入器每次完成寫入數據後,都需要通過某種通知機制通知處於阻塞狀態的讀取器,告訴它可以對數據進行訪問,這種場景正好可以通過條件變數來實現:
package main import ( "bytes" "fmt" "io" "sync" "time" ) // 數據 bucket type DataBucket struct { buffer *bytes.Buffer //緩衝區 mutex *sync.RWMutex //互斥鎖 cond *sync.Cond //條件變數 } func NewDataBucket() *DataBucket { buf := make([]byte, 0) db := &DataBucket{ buffer: bytes.NewBuffer(buf), mutex: new(sync.RWMutex), } db.cond = sync.NewCond(db.mutex.RLocker()) return db } // 讀取器 func (db *DataBucket) Read(i int) { db.mutex.RLock() // 打開讀鎖 defer db.mutex.RUnlock() // 結束後釋放讀鎖 var data []byte var d byte var err error for { //每次讀取一個位元組 if d, err = db.buffer.ReadByte(); err != nil { if err == io.EOF { // 緩衝區數據為空時執行 if string(data) != "" { // data 不為空,則列印它 fmt.Printf("reader-%d: %sn", i, data) } db.cond.Wait() // 緩衝區為空,通過 Wait 方法等待通知,進入阻塞狀態 data = data[:0] // 將 data 清空 continue } } data = append(data, d) // 將讀取到的數據添加到 data 中 } } // 寫入器 func (db *DataBucket) Put(d []byte) (int, error) { db.mutex.Lock() // 打開寫鎖 defer db.mutex.Unlock() // 結束後釋放寫鎖 //寫入一個數據塊 n, err := db.buffer.Write(d) db.cond.Signal() // 寫入數據後通過 Signal 通知處於阻塞狀態的讀取器 return n, err } func main() { db := NewDataBucket() go db.Read(1) // 開啟讀取器協程 go func(i int) { d := fmt.Sprintf("data-%d", i) db.Put([]byte(d)) // 寫入數據到緩衝區 }(1) // 開啟寫入器協程 time.Sleep(100 * time.Millisecond) }
這裡我們使用了讀寫互斥鎖,在讀取器裡面使用讀鎖,在寫入器裡面使用寫鎖,並且通過 defer
語句釋放鎖,然後在鎖保護的情況下,通過條件變數協調讀寫執行緒:在讀執行緒中,當緩衝區為空的時候,通過 db.cond.Wait()
阻塞讀執行緒;在寫執行緒中,當緩衝區寫入數據的時候通過 db.cond.Signal()
通知讀執行緒繼續讀取數據。
執行上述示例程式碼,結果如下:
reader-1: data-1
上述示例程式碼只有一個讀取器,一個寫入器,如果都有多個呢?我們可以通過啟動多個讀寫協程來模擬,此外,通知單個阻塞執行緒用 Signal
方法,通知多個阻塞執行緒需要使用 Broadcast
方法,按照這個思路,我們來改寫上述示例程式碼如下:
package main import ( "bytes" "fmt" "io" "sync" "time" ) // 數據 bucket type DataBucket struct { buffer *bytes.Buffer //緩衝區 mutex *sync.RWMutex //互斥鎖 cond *sync.Cond //條件變數 } func NewDataBucket() *DataBucket { buf := make([]byte, 0) db := &DataBucket{ buffer: bytes.NewBuffer(buf), mutex: new(sync.RWMutex), } db.cond = sync.NewCond(db.mutex.RLocker()) return db } // 讀取器 func (db *DataBucket) Read(i int) { db.mutex.RLock() // 打開讀鎖 defer db.mutex.RUnlock() // 結束後釋放讀鎖 var data []byte var d byte var err error for { //每次讀取一個位元組 if d, err = db.buffer.ReadByte(); err != nil { if err == io.EOF { // 緩衝區數據為空時執行 if string(data) != "" { // data 不為空,則列印它 fmt.Printf("reader-%d: %sn", i, data) } db.cond.Wait() // 緩衝區為空,通過 Wait 方法等待通知,進入阻塞狀態 data = data[:0] // 將 data 清空 continue } } data = append(data, d) // 將讀取到的數據添加到 data 中 } } // 寫入器 func (db *DataBucket) Put(d []byte) (int, error) { db.mutex.Lock() // 打開寫鎖 defer db.mutex.Unlock() // 結束後釋放寫鎖 //寫入一個數據塊 n, err := db.buffer.Write(d) db.cond.Broadcast() // 寫入數據後通過 Broadcast 通知處於阻塞狀態的讀取器 return n, err } func main() { db := NewDataBucket() for i := 1; i < 3; i++ { // 啟動多個讀取器 go db.Read(i) } for j := 0; j < 10; j++ { // 啟動多個寫入器 go func(i int) { d := fmt.Sprintf("data-%d", i) db.Put([]byte(d)) // 寫入數據到緩衝區 }(j) time.Sleep(100 * time.Millisecond) // 每次啟動一個寫入器暫停100ms,讓讀取器阻塞 } }
執行上述程式碼,列印結果如下:

可以看到,通過互斥鎖+條件變數,我們可以非常方便的實現多個 Go 協程之間的通訊,但是這個還是比不上 channel,因為 channel 還可以實現數據傳遞,條件變數只是發送訊號,喚醒被阻塞的協程繼續執行,另外 channel 還有超時機制,不會出現協程等不到訊號一直阻塞造成記憶體堆積問題,換句話說,channel 可以讓程式更可控。