重新梳理調度器——GMP 調度模型

調度器——GMP 調度模型

Goroutine 調度器,它是負責在工作線程上分發準備運行的 goroutines。

首先在講 GMP 調度模型之前,我們先了解為什麼會有這個模型,之前的調度模型是什麼樣子的?為什麼要改成現在的模式?

我們從當初的Goroutine 調度設計文檔得知之前採用了 GM 的調度模型,並且在高並發測試下性能不高。文中提到測試顯示 Vtocc 服務器在 8 核機器上的CPU最高為70%,而文件顯示 rutime.futex() 就消耗了14%。通常,在性能至關重要的情況下,調度器可能會禁止用戶使用慣用的細粒度並發。

那麼是什麼原因導致這些問題呢?Dmitry Vyukov 總結四個原因:

  • 使用了一個全局互斥鎖 mutex 處理整個與 goroutine 相關的操作(創建,完成,再調度等)。
  • 頻繁的 Goroutine 切換。工作線程會在那些可運行的 goroutine 之間頻繁切換,這就導致了增加延遲以及額外的開銷。
  • 每個線程M都需要處理內存緩存(每個M的緩存與運行 G 所需要的緩存比例差距太大,100:1),這就導致了大量的內存佔用影響了數據局部性。
  • 系統調用(syscall)會導致工作線程頻繁阻塞以及解除阻塞,這會導致大量的開銷。

為了解決這個問題,於是就引入了 Processor 這個概念。引入了這個對象並不會因為多了一個對象開銷性能都會有影響,反而這方面開銷都下降了。P 其實負責的是 M 與 G 之間的調度相關的操作,在執行 G 時 P 一定要與 M 綁定。並且把 M,schedule 裏面的對象都轉移到 P 中去了,所以 M 與 調度器原來的操作反而變得更乾淨了。如調度設計文檔中提到的:當 M 準備執行 Go 代碼時會從集合表中彈出一個 P;當執行代碼結束後就會將 P 推進集合中。所以當 M 需要執行 Go 代碼時,必須要與 P 綁定。而新增的這個機制,就是為了替代原來調度器中的 sched.atomic(mcpu/mcpumax)。

在設計文檔中還講到了調度時發生系統啟動、掛起、以及恢復(Syscall Park/Unpark)的指導方針,並且在後面的調度實現提供了依據。

從創建 G 就必須要確保由其他的 M 在執行 G。同樣當 M 進入系統調用時就必須要確保由其他的 M 來執行 Go 代碼。

這時有兩種選擇,要麼立即阻塞/解塞,要麼自旋。關於自旋有兩種級別:

  1. 空閑的 M 同一個相關聯的 P 自旋一段時間來尋找新的 G
  2. 一個 M 與一個相關聯的 P 自旋等待可用的 G

存在類型為上述 2 的空閑 M 時,類型為 1 的空閑 M 不會阻塞。當產生一個新 G 或 M 進入 Syscall 又或是 M 從空閑轉位忙碌時,能確保至少有一個 M 可以執行。這樣就避免了過多的 M 阻塞/解塞。

現在的調度模型主要分為三個概念:

  • Goroutine(G),表示待執行的任務
  • 工作線程(M),表示操作系統線程
  • 處理器(P),執行 Go 代碼所需要的一種資源

P 必須要綁定到 M 上來執行具體的 Go 代碼。

在講 GMP 調度模型之前,我們先來了解以下 G、M、P 這三個對象有哪些核心變量。

G

Goroutine 是建立在 M 內核線程之上的稱為協程的一個執行單元。在切換 G 時都是直接在用戶態發生的,所以開銷很小。所佔用的內存也比原來小了很多,從前面的內容我們知道,我們把其中某些元素放入至新引入的 P 中了。雖然佔用的內存不大,但是裏面的變量卻非常多。我們目前了解其中相對重要的部分,其它的字段想進一步了解,可以直接查看 runtime2.g 源碼.

type g struct {
	stack       stack   // offset known to runtime/cgo
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink
	...
}
  • stack 開頭的三個變量,都是與棧相關的變量。stack 表示當前 g 所佔用的實際棧內存大小:[stack.lo, stack.hi]。
  • stackguard0 這個是在 Go 棧內存增長,參與棧內存計算比較的時候要用到的,並且會參與搶佔調度
  • stackguard1 這個是在 C 棧內存增長時,參與棧內存計算比較。
type g struct {
	...
	m            *m
	sched        gobuf
	param        unsafe.Pointer
	atomicstatus uint32
	schedlink    guintptr
	gopc           uintptr
	startpc        uintptr
	waiting        *sudog
	...
}
  • g 中包含 m 這個不意外,因為 g 的執行最終是通過當前的 m 來執行的。
  • sched 這是一個 gobuf 對象,裏面包含棧指針 sp、程序計數器 pc、返回值 ret以及其它上下文信息 ctxt 等,在 g 發生調度的時候(如系統調用)就會靠着 shced 來執行或恢復之前操作相關的數據
  • param g 在活動期間傳遞的參數
  • atomicstatus 這個就表示當前 g 的運行狀態了
  • schedlink,這個有點意思,它表示是調度器的鏈接器。通過它 g 就可以在 schedt 調度器的全局的 runq 隊列中定位到可用的 g。
  • gopc 是開啟 goroutine 那個時刻的程序計數器(就是 Go 代碼中的 go func() 的位置)
  • startpc 即 go 開啟協程執行的函數的程序計數器 pc
  • waiting 表示 go 在等待,是一個 sudog 指針類型,sudog 表示的是一個等待 g 的集合列表。還有等待相關的參數如(waitsince、waitreson)這裡就不介紹了。
type g struct {
	...
	preempt       bool // 搶佔信號, 與 stackguard0 = stackpreempt 一樣
	preemptStop   bool // 搶佔狀態更改為 _Gpreempted
	preemptShrink bool // shrink stack at synchronous safe point
	...
}

這三個變量是跟搶佔調度相關的。

M

M 是指操作系統線程,Go 在啟動時會根據 CPU 的核心數分配 M 的個數。最多會開啟 10000 個線程,並且這裏面大多數都不會執行用戶代碼。最多只有 GOMAXPROCS 個活躍的線程執行用戶代碼。默認的設置一般都是 CPU 的核心數,這樣是為了在調度的時候防止線程頻繁的發生上下文切換。而在調度 G 的所有過程都是在用戶態進行的,較於操作系統級的線程 M 切換來說開銷會小的多。

我們來看一下 M 的主要核心對象:

type m struct {
    g0      *g
    curg          *g
    ...
}

g0 是個特殊的 goroutine,它是持有調度棧的,它會參與調度的過程。如創建 m,創建 g 以及執行一些內存分配。

type m struct {
    p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	oldp          puintptr // the p that was attached before executing a syscall
    ...
}

這三個字段是與 P 處理器相關的;

  • p,當前正在執行 go 代碼的 P,如果沒有執行代碼就為 nil
  • nextp,下一個要執行的 p
  • oldp,在執行系統調用之前的 p

P

是新引入的在 G 和 M 之間的調度層。它負責調度 runq 等待隊列中的待運行的協程,在關鍵的操作時候可以選擇讓出線程,提高線程利用率。

P 內部也包含了大量對象,同樣我們主要了解其中相對重要的字段,與調度那些等待的 g 密切相關的內容。

type p struct {
	m           muintptr
	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	runnext guintptr
	...
}

前面提到了在執行具體 go 代碼時,p 一定要與 m 相關聯。後續的字段都是與運行的 goroutine 相關。

  • runq,是一個長度固定為 256 的數組結構
  • runqhead,runqtail 表示的當前 runq 隊列中的首尾的位置
  • runnext,表示的下一個可運行的 g(注意,不一定表示一定會在下一輪喚醒運行,如果沒有彈出執行就又會回到隊列的頭部位置)

其實從這四個字段我們就能看出,runq 其實一個由數組結構加雙指針構成的一個環形隊列結構

除此之外,還有一個匿名結構類型的字段需要注意,那就是 gFree。這個對象內部是由 gList、n 組成的一個鏈表對象,用來存放空閑 g 的。

gFree struct {
    gList
    n int32
}

啟動 Schedule 調度器

在調度 GMP 之前我們必須還要知道調度器是如何啟動的。

調度器啟動在 runtime.schedinit 可以看得到。除去初始化鎖的順序信息和其它必要的信息(如gc、棧、系統參數與環境變量等),我們主要看下面幾個變量:

func schedinit() {
	...
	_g_ := getg()
	if raceenabled {
		_g_.racectx, raceprocctx0 = raceinit()
	}
	sched.maxmcount = 10000
	...
	lock(&sched.lock)
	sched.lastpoll = uint64(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
	unlock(&sched.lock)
	...
}

調度器最多只能開啟 10000 個線程。如果設置了 GOMAXPROCS 則替換默認的 cpu 核心數。之後就會調用 procresize 對 proc 進行更改。這個時候調度器必須要上鎖,不會執行任何 goroutine 代碼。procresize 函數內部對全局變量 allp 的期望容量 capcity 與 procs 進行判斷。如果目標值要比期望值大,則會進行擴容給。否則直接追加即可:

func procresize(nprocs int32) *p {
	if nprocs > int32(len(allp)) {
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs)
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		...
		unlock(&allpLock)
	}
	...
}

擴容之後就會循環初始化 p(初始化期間對 pp 的id、status 以及內存緩存賦值), 並調用底層系統的寫屏障(write barrier)確保安全的對 allp 進行覆蓋。

在初始化階段,p 的狀態此時是 _Pgcstop。在初始化之後如果當前的 p 的序號是小於之前設置的 nproc 目標數時,就會將當前的 g.m.p 的狀態更改為 _Prunning。如果不滿足上述條件,則會恆定取全局的 allp 中的第一個,並將狀態設置為 _Pidle。

設置完當前的 g.m.p 信息之後就會對一些不再引用的對象進行清理、壓縮以及將除 allp 集合中的第一個 p 之外將狀態全部置為 _Pidle,並將其放入調度器 sched.pidle 全局空閑隊列中去。

func procresize(nprocs int32) *p {
	...
	mcache0 = nil
	// release resources from unused P's
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy()
		// can't free P itself because it can be referenced by an M in syscall
	}
	// Trim allp.
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		idlepMask = idlepMask[:maskWords]
		timerpMask = timerpMask[:maskWords]
		unlock(&allpLock)
	}
    var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		if _g_.m.p.ptr() == p {
			continue
		}
		p.status = _Pidle
		if runqempty(p) {
			pidleput(p)
		} else {
			p.m.set(mget())
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	...
    return runnablePs
}

小結

調度器啟動總起來就是如下步驟:

  • 程序啟動,編譯器調用 runtime.schedinit,初始化系統信息、gc 初始化以及其它相關的信息
  • 調用 getg 獲取當前
  • 設置調度器相關的信息(如 maxmcount、procs),其中設置 procs 還會涉及擴充 resize。
  • 設置完 procs 就是要對當前的 g 綁定對應的 p,所以就會初始化 p,將全局的 allp[0] 綁定到當前的 g 下並將其餘的全部推送到調度器全局空閑隊列中。

新建 Goroutine

其實我們可以從一個例子着手,查看 go 是如何啟動一個 goroutine 的

func startg() {
	go func() {
		fmt.Println("start g")
	}()
}

在啟動 main.go 的時候,runtime 會執行 proc.go.main 方法創建主協程,並初始化一些信息以及 gc 相關的標識等操作。我們可以通過

go build -gcflag -S startg.go 命令能查看,編譯器調用了 runtime.newproc(SB),這個方法有兩個參數,一個是參數的大小,另一個是 goroutine 要執行的函數體。

func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)
		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)
		if mainStarted {
			wakep()
		}
	})
}

newproc 方法主要就是保存這兩個參數的信息以及對應的程序計數器 pc。然後會根據這些變量來新生成一個 g,然後把這個新生成的 g 推送到當前 g 上的線程的處理器 p 的局部 runq 隊列中,然後根據特定的條件(mainStarted)來決定是否喚醒。

newproc1 除了一些棧空間大小的判斷以及參數、調度器的內存地址拷貝之外,主要執行了如下功能:

  • 創建新的 g
  • 給 g 分配棧
  • 更改 g 的狀態
  • 更改 g 的屬性(調度器的指針,計數器等)
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()
	...
	acquirem()
	...
	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	...
    if narg > 0 {
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
		if writeBarrier.needed && !_g_.m.curg.gcscandone {
			f := findfunc(fn.fn)
			stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
			if stkmap.nbit > 0 {
				// We're in the prologue, so it's always stack map index 0.
				bv := stackmapdata(stkmap, 0)
				bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
			}
		}
	}
    ...
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg, false) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	casgstatus(newg, _Gdead, _Grunnable)
	...
	releasem(_g_.m)
	return newg
}

上面的代碼我省略了其它不在考慮的代碼。在創建新 g 之前獻給 m 上鎖了防止被搶佔,因為後續要對當前的 m 相關的 p 下的局部隊列保存 g。

在創建 newg 的時候首先會調用 gfget(p) 從當前 p 下的 gFree 局部隊列中獲取空閑的 g(狀態為 Gdead),如果局部隊列中沒有的話,就從調度器 sched 的全局隊列中竊取空閑的 g。

如果發生了竊取,那麼就會在第一次竊取時就把調度器 sched 中的空閑 g 的批次的全部竊取到自己的局部隊列中直到局部隊列滿(n = 32)。

如果全局隊列中也沒有 g 的話。那麼就會調用 malg(_StackMin) 根據傳入的棧大小生成 newg。然後就會調用 memmove 指令將數據以及 fn 信息拷貝到棧上。最後就會將前面保存的棧指針以及 fn 程序計數器等信息保存在 newg 上,並更改 newg 狀態由 _Gdead 轉變為 _Grunnable。最後釋放 g.m 並返回 newg。

新生成並返回的 newg 最終會由 runqput 推送至當前 p 下的隊列中。

func runqput(_p_ *p, gp *g, next bool) {
	...
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}
retry:
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

runqnext 會根據傳入的 next 參數決定走兩個分支:

  • true:直接將 g 傳給當前 p 下的 runnext
  • false:會判斷 p 中 runq 中的元素是否已滿,如果沒滿則插入隊尾;否則則走 runqputslow。

runqputslow 在 p 的局部隊列滿的情況下,負責取出隊列中的一部分以及待加入的新 g 添加到調度器的全局運行隊列上。

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
	var batch [len(_p_.runq)/2 + 1]*g
	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	for i := uint32(0); i < n; i++ {
		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
	}
	...
	batch[n] = gp
	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
	...
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
	return true
}

要注意,globrunqputbatch 在添加全局隊列 sched.runq 前後是要加鎖的防止並發修改這個共享的全局變量。

注意:關於 p 本地運行隊列 runq 和調度器 sched 的運行隊列 runq 同樣都是鏈表,但是組成的結構完全不一樣。

p 的 runq 是通過數組+雙指針形成的環形隊列。

sched 的 runq 就是單純的鏈表結構

調度

在執行完 schedinit 之後就會調用創建 M 的入口函數 runtime.mstart,前者內部會調用 runtime.mstart1。前者主要初始化 g0 的 stackguard0 和 stackguard1 字段。後者會初始化線程 m 以及 m0 獨有的邏輯(信號處理程序)。最後會開始調用 runtime.schedule 進行調度。

schedule 主要工作就是創建 g。但是創建 g 的過程非常複雜,在調度 g 之前進行了多次判斷:

  1. 首先判斷是否因為 gc 等待,如果是因為 gc 就等待 gc 結束。
  2. 判斷是否執行安全點函數。

剩下就要針對各種情況對 g 進行賦值:

  1. 如果存在 gc 標記作業,那麼就得去 gc 控制器中嘗試獲取可運行的 g。
  2. 3 沒有成功獲取,則通過一定的算法盡量公平的(魔法數字 61)通過 runtime.globrunqget 先嘗試從全局可運行隊列中獲取 g,在返回的同時還會將全局隊列中的待運行的 g 一次性取一份到當前 p 下的局部隊列中。
  3. 如果全局隊列中沒有 g,則去當前線程下的 p 的 runq 隊列中獲取
  4. p 局部隊列中也沒有,那麼就會同步調用 runtime.findrunnable 等待返回一個可運行的 g。
    • 這部分的功能非常複雜,總的來說就是去其他 P 的局部或全局運行隊列中竊取可運行的 g
    • 輪訓網絡查看是否有 g 等待運行
    • 在返回可運行的 g 之前還會判斷線程自旋是否超過了正在活躍的線程數,超過了就阻塞,來避免 CPU 的負擔過大。
    • 在循環竊取的時候能還通過竊取計數器去傳遞竊取的次數
  5. 最終返回可運行的 g 去執行調度 runtime.execute

最後就會調用把獲取的 g 並把其中的調度器信息傳遞給 runtime.gogo(asm-amd64)。在這個函數中會根據傳遞的調度信息中的 gobuf 來獲取程序計數器 pc 與棧指針以及對應的上下文信息。這樣就可以根據這些數據恢復到要執行的 fn 對應的地址來繼續執行。

關於切換 g 然後根據獲取的 pc、sp 以及 context 恢復在切換 g 之前的位置繼續往下執行的過程,熟悉 c# async await 狀態機切換函數調用的過程的同學可以將此等同(不負責任)

最後就會調用 runtime.goexit 退出,退出之後最終會在 g0 的棧上找到 runtime.goexit0 該函數,將 goroutine 的狀態置為 _Gdead,並清理它的字段信息、取消與 m 的關係、移除 g 並調用 runtime.gfput 推送到當前 p 下的空閑列表 gFree 列表中。然後又重新調用 runtime.schedule 進行一下輪調度。

所以這個調度是一個不斷循環上述的調度過程

注意:⚠️⚠️⚠️

關於 runtime.gogo 中最後會調用 runtime.goexit 的說法我參考了 dravenss《Go 語言程序與設計》中 6.5 調度器的說法,我在彙編代碼中並沒有找到這種說法的痕迹(也是因為沒有看懂這裡)。

至此,整個由調度器啟動到創建初始化線程再到獲取或創建 G 再將 P 與之要執行的線程關聯最後到結束至下一輪調度循環的過程就分析完了。其實這裏面還有很多被我忽略的細節,比如還有大量的 trace 的功能、以及每個對象的對於棧 size 的重計算、等待及等待隊列 runtime.sudog 、自旋的細節都沒有覆蓋到。一是涉及到這些會讓我了解並分析調度過程變得更加複雜,二是那些跟 GMP 調度關係不是很大。

小結

關於調度器的過程就是當一個新的 G 或已存在的 G 成為可運行狀態時,它就會被推送至可運行的攜程集合 runq 中。當 P 執行完 G 時就會從可運行 runq 集合中彈出 G 作為一下個要運行的 G。如果集合中沒有就會去其它 P 下的 runq 隊列中獲取 G;如果還沒有就去全局的 runq 鏈表中獲取。一次竊取多個到自己 P 下的 runq 中。

但凡涉及到全局隊列操作時都要上鎖保證當前隊列不被其他線程更改。

參考鏈接