golang並發編程

在早期,CPU都是以單核的形式順序執行機器指令。C語言、PHP正是這種順序編程語言的代表,即所有的指令都是以串行的方式執行,在相同的時刻有且僅有一個CPU在順序執行程序的指令。隨着處理器技術的發展,單核時代以提升處理器頻率來提高運行效率的方式遇到了瓶頸。單核CPU的發展的停滯,給多核CPU的發展帶來了機遇。相應地,編程語言也開始逐步向並行化的方向發展。Go語言正是在多核和網絡化的時代背景下誕生的原生支持並發的編程語言。

Goroutine

goroutine 是 Go 語言特有的並發體,是一種輕量級的線程,由go關鍵字啟動。在真實的Go語言的實現中,goroutine 和系統線程也不是等價的。儘管兩者的區別實際上只是一個量的區別,但正是這個量變引發了 Go 語言並發編程質的飛躍。

package main

import "fmt"

func main() {
    //並發版hello world
    go println("hello world")
}

每個系統級線程都會有一個固定大小的棧(一般默認可能是8MB),這個棧主要用來保存函數遞歸調用時參數和局部變量。固定了棧的大小導致了兩個問題:一是對於很多只需要很小的棧空間的線程來說是一個巨大的浪費,二是對於少數需要巨大棧空間的線程來說又面臨棧溢出的風險。相反,一個 goroutine 會以一個很小的棧啟動(可能是2KB或4KB),當遇到當前棧空間不足時, goroutine 會根據需要動態地伸縮棧的大小。因為啟動的代價很小,所以我們可以輕易地啟動成千上萬個 goroutine 。
Go的調度器使用了一些技術手段,可以在n個操作系統線程上多工調度m個 goroutine 。只有在當前 goroutine 發生阻塞時才會導致調度,同時發生在用戶態,切換的代價要比系統線程低得多。運行時有一個 runtime.GOMAXPROCS 變量,用於控制當前運行正常非阻塞 goroutine 的系統線程數目。在Go語言中啟動一個 goroutine 不僅和調用函數一樣簡單,而且 goroutine 之間調度代價也很低,這些因素極大地促進了並發編程的流行和發展。

Channel

在並發編程中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數語言中,都是通過加鎖等線程同步方案來解決這一問題。而Go語言卻另闢蹊徑,它將共享的值通過Channel傳遞,數據競爭從設計層面上就被杜絕了。通過通道來傳值是Go語言推薦的做法,雖然像引用計數這類簡單的並發問題通過原子操作或互斥鎖就能很好地實現,但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程序。

創建通道

//非緩衝通道
ch1 := make(chan int)
//緩衝通道
ch2 := make(chan int, 1)

非緩衝通道必須確保有協程正在嘗試讀取當前通道,否則寫操作就會阻塞直到有其它協程來從通道中讀東西。

讀寫通道

//從通道讀,
data, ok := <-ch1
data := <-ch1
//往通道寫
ch2 <-data
//使用range讀,通道沒數據for就會阻塞,通道關閉就會退出for
for v := range ch1 {
    println(v)
}
//多路通道
for {
    select {
    case v := <-ch1:
        println(v)
    case v := <-ch2:
        println(v)
    }
}

通道滿了,寫操作就會阻塞,協程就會進入休眠,直到有其它協程讀通道挪出了空間,協程才會被喚醒。通道空了,讀操作就會阻塞,協程也會進入睡眠,直到有其它協程寫通道裝進了數據才會被喚醒。

//關閉通道
close(ch1)

讀取一個已經關閉的通道會立即返回通道類型的「零值」,而寫一個已經關閉的通道會拋異常。使用 for range 讀取時用完要記得關閉通道,否則會阻塞。

同步控制

func main() {
    go println("你好, 世界")
}

根據 Go 語言規範,main 函數退出時程序結束,不會等待任何後台線程。因為 goroutine 的執行和 main 函數的返回事件是並發的,誰都有可能先發生,所以什麼時候打印,能否打印都是未知的。

sleep

func main() {
    go println("你好, 世界")
    time.Sleep(time.Second)
    //或者一個死循環
    for {}
}

不可靠,因為實際協程執行時間未知

互斥鎖

func main() {
    var mu sync.Mutex
    mu.Lock()
    go func() {
        println("你好, 世界")
        mu.Unlock()
    }()
    mu.Lock()
}

主攜程中第二次獲取鎖時阻塞

通道

func main() {
    ch := make(chan int, 1)
    go func() {
        println("你好, 世界")
        ch<-1
    }()
    <-ch
}

從ch取值,由於通道為空所以會阻塞直到有數據寫入

原子等待組

func main() {
    var wg sync.WaitGroup
    wg.Add(10)

    for i := 1; i < 10; i++ {
        //wg.Add(1)
        go func(n int) {
            println("你好, ", n)
            wg.Done()   //wg.Add(-1)
        }(i)
    }
    //等待協程完成
    wg.Wait()
}

如果不把i作為參數傳入閉包函數,閉包go協程裏面引用的是變量i的地址,所有的go協程啟動後等待調用,很可能在for循環完成之後才被調用,所以輸出結果很多都是10

編程技巧

控制並發數

雖然啟動一個攜程代價很小,但是也不能無限制地創建攜程,否則導致cpu佔用過高

func main() {
    var limit = make(chan int, 3)
    for _, id := range ids {
        go func() {
            limit <- 1
            worker(id)
            <-limit
        }()
    }
    for {}
}

超時處理

當限制並發數的時候,如果有大量寫通道,會造成通道阻塞過長

func main() {
    select {
    case id <- 1:
        println("success")
    case <- time.After(3 * time.Second):
        println("timeout")
    }
}

生產者消費者實例

例如在tcp編程中,一個 goroutine 用來讀,一個 goroutine 用來寫,讀寫 goroutine 間用通道傳遞消息

func main()  {
	listen, _ := net.Listen("tcp4", ":9001")
	defer listen.Close()
	for {
		conn, _ := listen.Accept()

		ch := make(chan string, 10)
		go read(conn, ch)
		go write(conn, ch)
	}
}

func write(conn net.Conn, ch <-chan string) {
	for msg := range ch {
		_, err := conn.Write([]byte(msg))
		if err != nil {
			break
		}
	}
}

func read(conn net.Conn, ch chan<- string) {
	for {
		msg := make([]byte, 1024)
		n, err := conn.Read(msg)
		if err != nil {
			break
		}
		ch <- string(msg[:n])
	}
}
Tags: