Go中由WaitGroup引發對記憶體對齊思考

轉載請聲明出處哦~,本篇文章發佈於luozhiyun的部落格://www.luozhiyun.com

本文使用的go的源碼時14.4

WaitGroup使用大家都會,但是其中是怎麼實現的我們也需要知道,這樣才能在項目中儘可能的避免由於不正確的使用引發的panic。並且本文也將寫一下記憶體對齊方面做一個解析,喜歡大家喜歡。

WaitGroup介紹

WaitGroup 提供了三個方法:

    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()
  • Add,用來設置 WaitGroup 的計數值;
  • Done,用來將 WaitGroup 的計數值減 1,其實就是調用了 Add(-1);
  • Wait,調用這個方法的 goroutine 會一直阻塞,直到 WaitGroup 的計數值變為 0。

例子我就不舉了,網上是很多的,下面我們直接進入正題。

解析

type noCopy struct{}

type WaitGroup struct {
    // 避免複製使用的一個技巧,可以告訴vet工具違反了複製使用的規則
	noCopy noCopy
	// 一個複合值,用來表示waiter數、計數值、訊號量
	state1 [3]uint32
}
// 獲取state的地址和訊號量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		// 如果地址是64bit對齊的,數組前兩個元素做state,後一個元素做訊號量
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		// 如果地址是32bit對齊的,數組後兩個元素用來做state,它可以用來做64bit的原子操作,第一個元素32bit用來做訊號量
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

這裡剛開始,WaitGroup就秀了一把肌肉,讓我們看看大牛是怎麼寫程式碼的,思考一個原子操作在不同架構平台上是怎麼操作的,在看state方法裡面為什麼要這麼做之前,我們先來看看記憶體對齊。

記憶體對齊

在維基百科//en.wikipedia.org/wiki/Data_structure_alignment上我們可以看到對於記憶體對齊的定義:

A memory address a is said to be n-byte aligned when a is a multiple of n bytes (where n is a power of 2).

簡而言之,現在的CPU訪問記憶體的時候是一次性訪問多個bytes,比如32位架構一次訪問4bytes,該處理器只能從地址為4的倍數的記憶體開始讀取數據,所以要求數據在存放的時候首地址的值是4的倍數存放,者就是所謂的記憶體對齊。

由於找不到Go語言的對齊規則,我對照了一下C語言的記憶體對齊的規則,可以和Go語言匹配的上,所以先參照下面的規則。

記憶體對齊遵循下面三個原則:

  1. 結構體變數的起始地址能夠被其最寬的成員大小整除;
  2. 結構體每個成員相對於起始地址的偏移能夠被其自身大小整除,如果不能則在前一個成員後面補充位元組;
  3. 結構體總體大小能夠被最寬的成員的大小整除,如不能則在後面補充位元組;

通過下面的例子來實操一下記憶體對齊:

在32位架構中,int8佔1byte,int32佔4bytes,int16佔2bytes。

type A struct {
	a int8
	b int32
	c int16
}

type B struct {
	a int8
	c int16
	b int32
}

func main() {

	fmt.Printf("arrange fields to reduce size:\n"+
		"A align: %d, size: %d\n" ,
		unsafe.Alignof(A{}), unsafe.Sizeof(A{}) )

	fmt.Printf("arrange fields to reduce size:\n"+
		"B align: %d, size: %d\n" ,
		unsafe.Alignof(B{}), unsafe.Sizeof(B{}) )
}

//output:
//arrange fields to reduce size:
//A align: 4, size: 12
//arrange fields to reduce size:
//B align: 4, size: 8

下面以在32位的架構中運行為例子:

在32位架構的系統中默認的對齊大小是4bytes。

假設結構體A中a的起始地址為0x0000,能夠被最寬的數據成員大小4bytes(int32)整除,所以從0x0000開始存放佔用一個位元組即0x00000x0001;b是int32,佔4bytes,所以要滿足條件2,需要在a後面padding3個byte,從0x0004開始;c是int16,佔2bytes故從0x0008開始佔用兩個位元組,即0x00080x0009;此時整個結構體佔用的空間是0x0000~0x0009佔用10個位元組,10%4 != 0, 不滿足第三個原則,所以需要在後面補充兩個位元組,即最後記憶體對齊後佔用的空間是0x0000~0x000B,一共12個位元組。

Group 49

同理,相比結構體B則要緊湊些:

Group 50

WaitGroup中state方法的記憶體對齊

在講之前需要注意的是noCopy是一個空的結構體,大小為0,不需要做記憶體對齊,所以大家在看的時候可以忽略這個欄位。

在WaitGroup裡面,使用了uint32的數組來構造state1欄位,然後根據系統的位數的不同構造不同的返回值,下面我面先來說說怎麼通過sate1這個欄位構建waiter數、計數值、訊號量的。

首先unsafe.Pointer來獲取state1的地址值然後轉換成uintptr類型的,然後判斷一下這個地址值是否能被8整除,這裡通過地址 mod 8的方式來判斷地址是否是64位對齊。

因為有記憶體對齊的存在,在64位架構裡面WaitGroup結構體state1起始的位置肯定是64位對齊的,所以在64位架構上用state1前兩個元素並成uint64來表示statep,state1最後一個元素表示semap;

那麼64位架構上面獲取state1的時候能不能第一個元素表示semap,後兩個元素拼成64位返回呢?

答案自然是不可以,因為uint32的對齊保證是4bytes,64位架構中一次性處理事務的一個固定長度是8bytes,如果用state1的後兩個元素表示一個64位字的欄位的話CPU需要讀取記憶體兩次,不能保證原子性。

但是在32位架構裡面,一個字長是4bytes,要操作64位的數據分布在兩個數據塊中,需要兩次操作才能完成訪問。如果兩次操作中間有可能別其他操作修改,不能保證原子性。

同理32位架構想要原子性的操作8bytes,需要由調用方保證其數據地址是64位對齊的,否則原子訪問會有異常,我們在這裡//golang.org/pkg/sync/atomic/#pkg-note-BUG可以看到描述:

On ARM, x86-32, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

所以為了保證64位字對齊,只能讓變數或開闢的結構體、數組和切片值中的第一個64位字可以被認為是64位字對齊。但是在使用WaitGroup的時候會有嵌套的情況,不能保證總是讓WaitGroup存在於結構體的第一個欄位上,所以我們需要增加填充使它能對齊64位字。

在32位架構中,WaitGroup在初始化的時候,分配記憶體地址的時候是隨機的,所以WaitGroup結構體state1起始的位置不一定是64位對齊,可能會是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4,如果出現這樣的情況,那麼就需要用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。

小結

這裡小結一下,因為為了完成上面的這篇內容實在是查閱了很多資料,才得出這樣的結果。所以這裡小結一下,在64位架構中,CPU每次操作的字長都是8bytes,編譯器會自動幫我們把結構體的第一個欄位的地址初始化成64位對齊的,所以64位架構上用state1前兩個元素並成uint64來表示statep,state1最後一個元素表示semap;

然後在32位架構中,在初始化WaitGroup的時候,編譯器只能保證32位對齊,不能保證64位對齊,所以通過uintptr(unsafe.Pointer(&wg.state1))%8判斷是否等於0來看state1記憶體地址是否是64位對齊,如果是,那麼也和64位架構一樣,用state1前兩個元素並成uint64來表示statep,state1最後一個元素表示semap,否則用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。

如果我說錯了,歡迎來diss我,我覺得我需要學習的地方還有很多。

Group 21

Add 方法

func (wg *WaitGroup) Add(delta int) {
	// 獲取狀態值
	statep, semap := wg.state()
	...
	// 高32bit是計數值v,所以把delta左移32,增加到計數上
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	// 獲取計數器的值
	v := int32(state >> 32)
	// 獲取waiter的值
	w := uint32(state)
	...
	// 任務計數器不能為負數
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// wait不等於0說明已經執行了Wait,此時不容許Add
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 計數器的值大於或者沒有waiter在等待,直接返回
	if v > 0 || w == 0 {
		return
	} 
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 此時,counter一定等於0,而waiter一定大於0
	// 先把counter置為0,再釋放waiter個數的訊號量
	*statep = 0
	for ; w != 0; w-- {
		//釋放訊號量,執行一次釋放一個,喚醒一個等待者
		runtime_Semrelease(semap, false, 0)
	}
}
  1. add方法首先會調用state方法獲取statep、semap的值。statep是一個uint64類型的值,高32位用來記錄add方法傳入的delta值之和;低32位用來表示調用wait方法等待的goroutine的數量,也就是waiter的數量。如下:

Group 15

  1. add方法會調用atomic.AddUint64方法將傳入的delta左移32位,也就是將counter加上delta的值;

  2. 因為計數器counter可能為負數,所以int32來獲取計數器的值,waiter不可能為負數,所以使用uint32來獲取;

  3. 接下來就是一系列的校驗,v不能小於零表示任務計數器不能為負數,否則會panic;w不等於,並且v的值等於delta表示wait方法先於add方法執行,此時也會panic,因為waitgroup不允許調用了Wait方法後還調用add方法;

  4. v大於零或者w等於零直接返回,說明這個時候不需要釋放waiter,所以直接返回;

  5. *statep != state到了這個校驗這裡,狀態只能是waiter大於零並且counter為零。當waiter大於零的時候是不允許再調用add方法,counter為零的時候也不能調用wait方法,所以這裡使用state的值和記憶體的地址值進行比較,查看是否調用了add或者wait導致state變動,如果有就是非法調用會引起panic;

  6. 最後將statep值重置為零,然後釋放所有的waiter;

Wait方法

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	...
	for {
		state := atomic.LoadUint64(statep)
		// 獲取counter
		v := int32(state >> 32)
		// 獲取waiter
		w := uint32(state)
		// counter為零,不需要等待直接返回
		if v == 0 {
			...
			return
		}
		// 使用CAS將waiter加1
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			...
			// 掛起等待喚醒
			runtime_Semacquire(semap)
			// 喚醒之後statep不為零,表示WaitGroup又被重複使用,這回panic
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			...
         	// 直接返回   
			return
		}
	}
}
  1. Wait方法首先也是調用state方法獲取狀態值;
  2. 進入for循環之後Load statep的值,然後分別獲取counter和counter;
  3. 如果counter已經為零了,那麼直接返回不需要等待;
  4. counter不為零,那麼使用CAS將waiter加1,由於CAS可能失敗,所以for循環會再次的回到這裡進行CAS,直到成功;
  5. 調用runtime_Semacquire掛起等待喚醒;
  6. *statep != 0喚醒之後statep不為零,表示WaitGroup又被重複使用,這會panic。需要注意的是waitgroup並不是不讓重用,而是不能在wait方法還沒運行完就開始重用。

waitgroup使用小結

看完了waitgroup的add方法與wait方法,我們發現裡面有很多校驗,使用不當會導致panic,所以我們需要總結一下如何正確使用:

  • 不能將計數器設置為負數,否則會發生panic;注意有兩種方式會導致計數器為負數,一是調用 Add 的時候傳遞一個負數,第二是調用 Done 方法的次數過多,超過了 WaitGroup 的計數值;
  • 在使用 WaitGroup 的時候,一定要等所有的 Add 方法調用之後再調用 Wait,否則就可能導致 panic;
  • wait還沒結束就重用 WaitGroup。WaitGroup是可以重用的,但是需要等上一批的goroutine 都調用wait完畢後才能繼續重用WaitGroup;

總結

waitgroup裡面的程式碼實際上是非常的簡單的,這篇文章主要是由waitgroup引入了記憶體對齊這個概念。由waitgroup帶我們看了在實際的程式碼中是如何利用記憶體對齊這個概念的,以及如何在32為作業系統中原子性的操作64位長的欄位。

除了記憶體對齊的概念以外通過源碼我們也了解到了使用waitgroup的時候需要怎麼做才是符合規範的,不會引發panic。

Reference

//blog.newbmiao.com/2020/02/10/dig101-golang-struct-memory-align.html

//gfw.go101.org/article/memory-layout.html

//golang.org/pkg/sync/atomic/#pkg-note-BUG

//en.wikipedia.org/wiki/Data_structure_alignment

//www.zhihu.com/question/27862634

Tags: