理解Go協程與並發

  • 2019 年 10 月 3 日
  • 筆記

協程

Go語言里創建一個協程很簡單,使用go關鍵字就可以讓一個普通方法協程化:

package main    import (      "fmt"      "time"  )    func main(){      fmt.Println("run in main coroutine.")        for i:=0; i<10; i++ {          go func(i int) {              fmt.Printf("run in child coroutine %d.n", i)          }(i)      }        //防止子協程還沒有結束主協程就退出了      time.Sleep(time.Second * 1)  }

下面這些概念可能不太好理解,需要慢慢理解。可以先跳過,回頭再來看。

概念:

  1. 協程可以理解為純用戶態的執行緒,其通過協作而不是搶佔來進行切換。相對於進程或者執行緒,協程所有的操作都可以在用戶態完成,創建和切換的消耗更低。
  2. 一個進程內部可以運行多個執行緒,而每個執行緒又可以運行很多協程。執行緒要負責對協程進行調度,保證每個協程都有機會得到執行。當一個協程睡眠時,它要將執行緒的運行權讓給其它的協程來運行,而不能持續霸佔這個執行緒。同一個執行緒內部最多只會有一個協程正在運行。
  3. 協程可以簡化為三個狀態:運行態就緒態休眠態。同一個執行緒中最多只會存在一個處於運行態的協程。就緒態協程是指那些具備了運行能力但是還沒有得到運行機會的協程,它們隨時會被調度到運行態;休眠態的協程還不具備運行能力,它們是在等待某些條件的發生,比如 IO 操作的完成、睡眠時間的結束等。
  4. 子協程的異常退出會將異常傳播到主協程,直接會導致主協程也跟著掛掉。

協程一般用 TCP/HTTP/RPC服務、消息推送系統、聊天系統等。使用協程,我們可以很方便的搭建一個支援高並發的TCP或HTTP服務端。

通道

通道的英文是Channels,簡稱chan。什麼時候要用到通道呢?可以先簡單的理解為:協程在需要協作通訊的時候就需要用通道。

在GO里,不同的並行協程之間交流的方式有兩種,一種是通過共享變數,另一種是通過通道。Go 語言鼓勵使用通道的形式來交流。

舉個簡單的例子,我們使用協程實現並發調用遠程介面,最終我們需要把每個協程請求回來的數據進行匯總一起返回,這個時候就用到通道了。

創建通道

創建通道(channel)只能使用make函數:

c := make(chan int)

通道是區分類型的,如這裡的int

Go 語言為通道的讀寫設計了特殊的箭頭語法糖 <-,讓我們使用通道時非常方便。把箭頭寫在通道變數的右邊就是寫通道,把箭頭寫在通道的左邊就是讀通道。一次只能讀寫一個元素。

c := make(chan bool)  c <- true //寫入  <- c //讀取

緩衝通道

上面我們介紹了默認的非快取類型的channel,不過Go也允許指定channel的緩衝大小,很簡單,就是channel可以存儲多少元素:

c := make(chan int, value)

value = 0 時,通道是無緩衝阻塞讀寫的,等價於make(chan int);當value > 0 時,通道有緩衝、是非阻塞的,直到寫滿 value 個元素才阻塞寫入。具體說明下:

非緩衝通道
無論是發送操作還是接收操作,一開始執行就會被阻塞,直到配對的操作也開始執行才會繼續傳遞。由此可見,非緩衝通道是在用同步的方式傳遞數據。也就是說,只有收發雙方對接上了,數據才會被傳遞。數據是直接從發送方複製到接收方的,中間並不會用非緩衝通道做中轉。

緩衝通道
緩衝通道可以理解為消息隊列,在有容量的時候,發送和接收是不會互相依賴的。用非同步的方式傳遞數據。

下面我們用一個例子來理解一下:

package main    import "fmt"    func main() {      var c = make(chan int, 0)      var a string        go func() {          a = "hello world"          <-c      }()        c <- 0      fmt.Println(a)  }

這個例子輸出的一定是hello world。但是如果你把通道的容量由0改為大於0的數字,輸出結果就不一定是hello world了,很可能是空。為什麼?

當通道是無緩衝通道時,執行到c <- 0,通道滿了,寫操作會被阻塞住,直到執行<-c解除阻塞,後面的語句接著執行。

要是改成非阻塞通道,執行到c <- 0,發現還能寫入,主協程就不會阻塞了,但這時候輸出的是空字元串還是hello world,取決於是子協程和主協程哪個運行的速度快。

通道作為容器,它可以像切片一樣,使用 cap()len() 全局函數獲得通道的容量和當前內部的元素個數。

模擬消息隊列

上一節"協程"的例子里,我們在主協程里加了個time.Sleep(),目的是防止子協程還沒有結束主協程就退出了。但是對於實際生活的大多數場景來說,1秒是不夠的,並且大部分時候我們都無法預知for循環內程式碼運行時間的長短。這時候就不能使用time.Sleep() 來完成等待操作了。下面我們用通道來改寫:

package main    import (      "fmt"  )    func main() {      fmt.Println("run in main coroutine.")        count := 10      c := make(chan bool, count)        for i := 0; i < count; i++ {          go func(i int) {              fmt.Printf("run in child coroutine %d.n", i)              c <- true          }(i)      }        for i := 0; i < count; i++ {          <-c      }  }

單向通道

默認的通道是支援讀寫的,我們可以定義單向通道:

//只讀  var readOnlyChannel = make(<-chan int)    //只寫  var writeOnlyChannel = make(chan<- int)

下面是一個示例,我們模擬消息隊列的消費者、生產者:

package main    import (      "fmt"      "time"  )    func Producer(c chan<- int) {      for i := 0; i < 10; i++ {          c <- i      }  }    func Consumer1(c <-chan int) {      for m := range c {          fmt.Printf("oh, I get luckly num: %vn", m)      }  }    func Consumer2(c <-chan int) {      for m := range c {          fmt.Printf("oh, I get luckly num too: %vn", m)      }  }    func main() {      c := make(chan int, 2)        go Consumer1(c)      go Consumer2(c)        Producer(c)        time.Sleep(time.Second)  }

對於生產者,我們希望通道是只寫屬性,而對於消費者則是只讀屬性,這樣避免對通道進行錯誤的操作。當然,如果你將本例里消費者、生產者的通道單向屬性去掉也是可以的,沒什麼問題:

func Producer(c chan int) {}  func Consumer1(c chan int) {}  func Consumer2(c chan int) {}

事實上 channel 只讀或只寫都沒有意義,所謂的單向 channel 其實只是方法里聲明時用,如果後續程式碼里,向本來用於讀channel里寫入了數據,編譯器會提示錯誤。

關閉通道

讀取一個已經關閉的通道會立即返回通道類型的零值,而寫一個已經關閉的通道會拋異常。如果通道里的元素是整型的,讀操作是不能通過返回值來確定通道是否關閉的。

1、如何安全的讀通道,確保不是讀取的已關閉通道的零值
答案是使用for...range語法。當通道為空時,循環會阻塞;當通道關閉,循環會停止。通過循環停止,我們可以認為通道已經關閉。示例:

package main    import "fmt"    func main() {      var c = make(chan int, 3)        //子協程寫      go func() {          c <- 1          close(c)      }()        //直接讀取通道,存在不知道子協程是否已關閉的情況      //fmt.Println(<-c)      //fmt.Println(<-c)        //主協程讀取:使用for...range安全的讀取      for value := range c {          fmt.Println(value)      }  }

輸出:

1

2、如何安全的寫通道,確保不會寫入已關閉的通道?
Go 語言並不存在一個內置函數可以判斷出通道是否已經被關閉。確保通道寫安全的最好方式是由負責寫通道的協程自己來關閉通道,讀通道的協程不要去關閉通道。

但是這個方法只能解決單寫多讀的場景。如果遇到多寫單讀的情況就有問題了:無法知道其它寫協程什麼時候寫完,那麼也就不能確定什麼時候關閉通道。這個時候就得額外使用一個通道專門做這個事情。

我們可以使用內置的 sync.WaitGroup,它使用計數來等待指定事件完成:

package main    import (      "fmt"      "sync"      "time"  )    func main() {        var ch = make(chan int, 8)        //寫協程      var wg = new(sync.WaitGroup)        for i := 1; i <= 4; i++ {          wg.Add(1)          go func(num int, ch chan int, wg *sync.WaitGroup) {              defer wg.Done()              ch <- num              ch <- num * 10          }(i, ch, wg)      }        //讀      go func(ch chan int) {          for num := range ch {              fmt.Println(num)          }      }(ch)        //Wait阻塞等待所有的寫通道協程結束,待計數值變成零,Wait才會返回      wg.Wait()        //安全的關閉通道      close(ch)        //防止讀取通道的協程還沒有完畢      time.Sleep(time.Second)        fmt.Println("finish")  }

輸出:

  3  30  2  20  1  10  4  40  finish

多路通道

有時候還會遇到多個生產者,只要有一個生產者就緒,消費者就可以進行消費的情況。這個時候可以使用go語言提供的select 語句,它可以同時管理多個通道讀寫,如果所有通道都不能讀寫,它就整體阻塞,只要有一個通道可以讀寫,它就會繼續。示例:

package main    import (      "fmt"      "time"  )    func main() {        var ch1 = make(chan int)      var ch2 = make(chan int)        fmt.Println(time.Now().Format("15:04:05"))        go func(ch chan int) {          time.Sleep(time.Second)          ch <- 1      }(ch1)        go func(ch chan int) {          time.Sleep(time.Second * 2)          ch <- 2      }(ch2)        for {          select {              case v := <-ch1:                  fmt.Println(time.Now().Format("15:04:05") + ":來自ch1:", v)              case v := <-ch2:                  fmt.Println(time.Now().Format("15:04:05") + ":來自ch2:", v)              //default:                  //fmt.Println("channel is empty !")          }      }  }

輸出:

13:39:56  13:39:57:來自ch1: 1  13:39:58:來自ch2: 2  fatal error: all goroutines are asleep - deadlock!

默認select處於阻塞狀態,1s後,子協程1完成寫入,主協程讀出了數據;接著子協程2完成寫入,主協程讀出了數據;接著主協程掛掉了,原因是主協程發現在等一個永遠不會來的數據,這顯然是沒有結果的,乾脆就直接退出了。

如果把注釋的部分打開,那麼程式在列印出來自ch1、ch2的數據後,就會一直執行default裡面的程式。這個時候程式不會退出。原因是當 select 語句所有通道都不可讀寫時,如果定義了 default 分支,那就會執行 default 分支邏輯。

註:select{}程式碼塊是一個沒有任何caseselect,它會一直阻塞。

Chan的應用場景

golang中chan的應用場景總結
https://github.com/nange/blog/issues/9

Go語言之Channels實際應用
https://www.s0nnet.com/archives/go-channels-practice

  • 消息隊列
  • 並發請求
  • 模擬鎖的功能
  • 模擬sync.WaitGroup
  • 並行計算

通道原理部分可以根據文末給出的參考鏈接《快學 Go 語言》第 12 課 —— 通道去查看。

並發鎖

互斥所

go語言里的map是執行緒不安全的:

package main    import "fmt"    func write(d map[string]string) {      d["name"] = "yujc"  }    func read(d map[string]string) {      fmt.Println(d["name"])  }    func main() {      d := map[string]string{}      go read(d)      write(d)  }

Go 語言內置了數據結構競態檢查工具來幫我們檢查程式中是否存在執行緒不安全的程式碼,只要在運行的時候加上-race參數即可:

$ go run -race main.go  ==================  WARNING: DATA RACE  Read at 0x00c0000a8180 by goroutine 6:    ...    yujc  Found 2 data race(s)  exit status 66

可以看出,上面的程式碼存在安全隱患。

我們可以使用sync.Mutex來保護map,原理是在每次讀寫操作之前使用互斥鎖進行保護,防止其他執行緒同時操作:

package main    import (      "fmt"      "sync"  )    type SafeDict struct {      data map[string]string      mux  *sync.Mutex  }    func NewSafeDict(data map[string]string) *SafeDict {      return &SafeDict{          data: data,          mux:  &sync.Mutex{},      }  }    func (d *SafeDict) Get(key string) string {      d.mux.Lock()      defer d.mux.Unlock()      return d.data[key]  }    func (d *SafeDict) Set(key string, value string) {      d.mux.Lock()      defer d.mux.Unlock()      d.data[key] = value  }    func main(){      dict := NewSafeDict(map[string]string{})        go func(dict *SafeDict) {          fmt.Println(dict.Get("name"))      }(dict)        dict.Set("name", "yujc")  }

運行檢測:

$ go run -race main.go  yujc

上面的程式碼如果不使用-race運行,不一定會有結果,取決於主協程、子協程哪個先運行。

注意:sync.Mutex 是一個結構體對象,這個對象在使用的過程中要避免被淺拷貝,否則起不到保護作用。應盡量使用它的指針類型。

上面的程式碼里我們多處使用了d.mux.Lock(),能否簡化成d.Lock()呢?答案是可以的。我們知道,結構體可以自動繼承匿名內部結構體的所有方法:

type SafeDict struct {      data map[string]string      *sync.Mutex  }    func NewSafeDict(data map[string]string) *SafeDict {      return &SafeDict{data, &sync.Mutex{}}  }    func (d *SafeDict) Get(key string) string {      d.Lock()      defer d.Unlock()      return d.data[key]  }

這樣就完成了簡化。

讀寫鎖

對於讀多寫少的場景,可以使用讀寫鎖代替互斥鎖,可以提高性能。

讀寫鎖提供了下面4個方法:

  • Lock() 寫加鎖
  • Unlock() 寫釋放鎖
  • RLock() 讀加鎖
  • RUnlock() 讀釋放鎖

寫鎖排它鎖,加寫鎖時會阻塞其它協程再加讀鎖寫鎖讀鎖共享鎖,加讀鎖還可以允許其它協程再加讀鎖,但是會阻塞加寫鎖讀寫鎖在寫並發高的情況下性能退化為普通的互斥鎖

我們把上節中的互斥鎖換成讀寫鎖:

package main    import (      "fmt"      "sync"  )    type SafeDict struct {      data map[string]string      *sync.RWMutex  }    func NewSafeDict(data map[string]string) *SafeDict {      return &SafeDict{data, &sync.RWMutex{}}  }    func (d *SafeDict) Get(key string) string {      d.RLock()      defer d.RUnlock()      return d.data[key]  }    func (d *SafeDict) Set(key string, value string) {      d.Lock()      defer d.Unlock()      d.data[key] = value  }    func main(){      dict := NewSafeDict(map[string]string{})        go func(dict *SafeDict) {          fmt.Println(dict.Get("name"))      }(dict)        dict.Set("name", "yujc")  }

改完後,使用競態檢測工具檢測還是能通過的。

參考

1、make(chan int) 和 make(chan int, 1) 的區別
https://www.jianshu.com/p/f12e1766c19f
2、channel
https://www.jianshu.com/p/4d97dc032730
3、《快學 Go 語言》第 12 課 —— 通道
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484601&idx=1&sn=97c0de2acc3127c9e913b6338fa65737
4、《快學 Go 語言》第 13 課 —— 並發與安全
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484683&idx=1&sn=966cb818f034ffd4538eae7a61cd0c58