探索互斥鎖 Mutex 實現原理
Mutex 互斥鎖
概要描述
mutex 是 go 提供的同步原語。用於多個協程之間的同步協作。在大多數底層框架代碼中都會用到這個鎖。
mutex 總過有三個狀態
- mutexLocked: 表示佔有鎖
- mutexWoken: 表示喚醒
- mutexStarving: 表示等待鎖的飢餓狀態(從正常模式進入飢餓狀態)
具體實現
首先得清楚 Mutex 的結構
type Mutex struct {
state int32
sema uint32
}
Lock 持有鎖
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
比較鎖的狀態,如果是初始狀態 0,則獲取當前鎖並將鎖的狀態置為 mutexLocked 之後返回,即 1。其他協程則在釋放鎖之前走 m.lockSlow
代碼分支。
m.lockSlow 代碼很複雜,裏面有一些比較的變量為了方便理解,我標記起來:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
...
}
- waitStartTime: 表示等待喚醒的時間,如果等待的時間超過了 1ms 則會走鎖的飢餓模式(後面會講到)
- starving: 表示是飢餓標識
- awoke: 是否被喚醒的標識
- iter: 等待鎖的次數
在申明這些變量之後,程序就會進入一個循環階段,在這個循環體內,runtime 會根據當前鎖的狀態 m.state
以及等待鎖的次數以及系統的自身運行情況來判斷程序是否進入自旋等待。
// lockSlow
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}
上面的第一個判斷語句就是判斷了當前鎖的狀態是否為普通模式(只有普通模式才會進入自旋)。除此之外 runtime_canSpin 還會判斷當前自旋的次數是否超過 4 次、當前操作系統是否為多核 cpu 、 GOMAXPROCS > 1
以及至少有一個正在運行的 P 並且 P 下的局部隊列 runq
是空閑的。
在沒有自旋的情況下,則正常往下執行,就會執行 runtime_SemacquireMutex 方法,目的就是獲取當前鎖的 m.sema
變量的地址作為信號量將當前的等待者進入 semaRoot
的等待優先級隊列(鏈表結構)中。並且根據傳的變量 queueLifo
來控制是傳入到隊列尾部還是頭部。如果 queueLifo=true
說明之前已經處理等待狀態,則就會插入到隊列的頭部等待喚醒,否則就進入隊列的尾部。在這個 runtime_SemacquireMutex 方法中會不斷嘗試獲取鎖並進入休眠狀態等待信號量釋放。一旦獲取鎖立即返回執行後續的代碼。
// lockSlow
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
//runtime_SemacquireMutex
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
...
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
如上面代碼所示,在插入隊列之後就會根據標識 starving
以及 runtime_nanotime() - waitStartTime > 1ms
來判斷當前等待鎖模式是否進入飢餓模式。
在進入飢餓模式之前,等待者優先級隊列中的第一個 g 被喚醒,如果其他新到達的 g 的優先級要比等待隊列中的要高,所以這個等待着就會獲取鎖失敗,那麼就會轉而進入到等待隊列中的頭部等待下次喚醒。而此時如果等待喚醒的隊列等待時間超過了 1ms,那麼就會進入飢餓模式,此時新到來的 g 就不會參與佔有鎖,也不會自旋,而是進入到等待着優先級隊列中的隊尾等待喚醒。
只有在這個等待隊列為空,或者最後一個等待着的等待喚醒時間小於 1ms,那就會推出飢餓模式。
所以這個飢餓模式就是為了防止等待隊列中的 g 陷入無限的等待狀態。
如何根據 m.sema 獲取等待者隊列?
首先 rutime 會根據 m.sema 的地址通過哈希計算來生成一個 table,每個 mutex 的信號量地址對應一個表。每個表都有一個 semaRoot 的對象,這個對象包含一個 treap *sudog
鏈表結構隊列。通過 semaroot.g = getg()
把當前的 g 綁定起來就進入到等待着隊列了。
// mutex.go
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
Unlock 釋放鎖
知道了 Lock 的實現細節,那麼釋放鎖我們就相對來說比較簡單了。我們從源碼的代碼量就能知道要比 Lock 簡單不少:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
首先就是通過原子操作將 m.state 減 1 操作讓其回到初始狀態 0。如果該鎖還沒有釋放則進入 unlockSlow 操作
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
該操作僅僅做了當前鎖狀態是否為飢餓模式,如果不是就會正常喚醒等待隊列中的 g,並通過 sema 信號量釋放該信號量將所有權交給等待者。如果是飢餓模式,則直接將當前所有權交給隊列中的下一個等待者。
總結
Mutex 在佔有鎖的時候會有兩種模式,一種是正常模式,等待者會進入一個等待者隊列等待被喚醒。其他 g 嘗試獲取鎖時會根據系統自身情況以及是否有 runq 空閑的 P 來進入自旋。在自旋的過程中會計算鎖的狀態。由於正常模式下等待者隊列中的 G 可能會無限等待下去,所以這個時候就會進入飢餓狀態。會將鎖的所有權直接交給被喚醒的等待者直到隊列為空退出飢餓模式。
在進入等待者隊列是會獲取當前鎖的信號量,這樣保證了同一個時刻不會有多個 G 佔有同一個鎖。
解鎖過程就是對應加鎖過程的逆過程。
如果當前鎖的狀態不對,則直接拋異常;當互斥鎖為飢餓模式時,直接將所有權交給等待者。當處於正常模式時,如果沒有鎖或者當前的鎖狀態不都為 0,則直接返回。否則就會釋放信號量喚醒等待者隊列中的等待者獲取鎖。