從ObjectPool到CAS指令
相信最近看過我的文章的朋友對於Microsoft.Extensions.ObjectPool
不陌生;復用、池化是在很多高性能場景的優化技巧,它能減少內存佔用率、降低GC頻率、提升系統TPS和降低請求時延。
那麼池化和復用對象意味着同一時間會有多個線程訪問池,去獲取和歸還對象,那麼這肯定就有並發問題。那ObjectPool
在涉及多線程訪問資源應該怎麼做到線程安全呢?
今天就帶大家通過學習ObjectPool
的源碼聊一聊它是如何實現線程安全的。
源碼解析
ObjectPool
的關鍵就在於兩個方法,一個是Get
用於獲取池中的對象,另外就是Return
用于歸還已經使用完的對象。我們先來簡單的看看ObjectPool
的默認實現DefaultObjectPool.cs
類的內容。
私有字段
先從它的私有變量開始,下面代碼中給出,並且注釋了其作用:
// 用於存放池化對象的包裝數組 長度為構造函數傳入的max - 1
// 為什麼 -1 是因為性能考慮把第一個元素放到 _firstItem中
private protected readonly ObjectWrapper[] _items;
// 池化策略 創建對象 和 回收對象的防範
private protected readonly IPooledObjectPolicy<T> _policy;
// 是否默認的策略 是一個IL優化 使編譯器生成call 而不是 callvirt
private protected readonly bool _isDefaultPolicy;
// 因為池化大多數場景只會獲取一個對象 為了性能考慮 單獨整一個對象不放在數組中
// 避免數組遍歷
private protected T? _firstItem;
// 這個類是在2.1中引入的,以儘可能地避免接口調用 也就是去虛擬化 callvirt
private protected readonly PooledObjectPolicy<T>? _fastPolicy;
構造方法
另外就是它的構造方法,默認實現DefaultObjectPool
有兩個構造函數,代碼如下所示:
/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy)
: this(policy, Environment.ProcessorCount * 2)
{
// 從這個構造方法可以看出,如果我們不指定ObjectPool的池大小
// 那麼池大小會是當前可用的CPU核心數*2
}
/// <summary>
/// Creates an instance of <see cref="DefaultObjectPool{T}"/>.
/// </summary>
/// <param name="policy">The pooling policy to use.</param>
/// <param name="maximumRetained">The maximum number of objects to retain in the pool.</param>
public DefaultObjectPool(IPooledObjectPolicy<T> policy, int maximumRetained)
{
_policy = policy ?? throw new ArgumentNullException(nameof(policy));
// 是否為可以消除callvirt的策略
_fastPolicy = policy as PooleObjectPolicy<T>;
// 如上面備註所說 是否為默認策略 可以消除callvirt
_isDefaultPolicy = IsDefaultPolicy();
// 初始化_items數組 容量還剩一個在 _firstItem中
_items = new ObjectWrapper[maximumRetained - 1];
bool IsDefaultPolicy()
{
var type = policy.GetType();
return type.IsGenericType && type.GetGenericTypeDefinition() == typeof(DefaultPooledObjectPolicy<>);
}
}
Get 方法
如上文所說,Get()
方法是ObjectPool
中最重要的兩個方法之一,它的作用就是從池中獲取一個對象,它使用了CAS
近似無鎖的指令來解決多線程資源爭用的問題,代碼如下所示:
public override T Get()
{
// 先看_firstItem是否有值
// 這裡使用了 Interlocked.CompareExchange這個方法
// 原子性的判斷 _firstItem是否等於item
// 如果等於那把null賦值給_firstItem
// 然後返回_firstItem對象原始的值 反之就是什麼也不做
var item = _firstItem;
if (item == null || Interlocked.CompareExchange(ref _firstItem, null, item) != item)
{
var items = _items;
// 遍歷整個數組
for (var i = 0; i < items.Length; i++)
{
item = items[i].Element;
// 通過原子性的Interlocked.CompareExchange嘗試讀取一個元素
// 讀取成功則返回
if (item != null && Interlocked.CompareExchange(ref items[i].Element, null, item) == item)
{
return item;
}
}
// 如果遍歷整個沒有獲取到元素
// 那麼走創建方法,創建一個
item = Create();
}
return item;
}
上面代碼中,有一個點解釋一下Interlocked.CompareExchange(ref _firstItem, null, item) != item
,其中!=item
,如果其等於item
就說明交換成功了,當前線程獲取到_firstItem
元素的期間沒有其它線程修改_firstItem
的值。
Return 方法
Retrun(T obj)
方法是ObjectPool
另外一個重要的方法,它的作用就是當程序代碼把從池中獲取的對象使用完以後,將其歸還到池中。同樣,它也使用CAS
指令來解決多線程資源爭用的問題,代碼如下所示:
public override void Return(T obj)
{
// 使用策略的Return方法對元素進行處理
// 比如 List<T> 需要調用Claer方法清除集合內元素
// StringBuilder之類的也需要調用Claer方法清除緩存的字符
if (_isDefaultPolicy || (_fastPolicy?.Return(obj) ?? _policy.Return(obj)))
{
// 先嘗試將歸還的元素賦值到 _firstItem中
if (_firstItem != null || Interlocked.CompareExchange(ref _firstItem, obj, null) != null)
{
var items = _items;
// 如果 _firstItem已經存在元素
// 那麼遍歷整個數組空間 找一個存儲為null的空位將對象存儲起來
for (var i = 0; i < items.Length && Interlocked.CompareExchange(ref items[i].Element, obj, null) != null; ++i)
{
}
}
}
}
從核心的Get()
和Set()
方法來看,其實整個代碼是比較簡單的,除了有一個_firstItem
有一個簡單的優化,其餘沒有什麼特別的複雜的邏輯。
主要的關鍵就在Interlocked.CompareExchange
方法上,我們在下文來仔細研究一下這個方法。
關於 Interlocked.CompareExchange
Interlocked.CompareExchange
它實際上是一個CAS
的實現,也就是Compare And Swap,從名字就可以看出來,它就是比較然後交換的意思。
從下面的代碼段我們也可以看出來,它總共需要三個參數。其特性就是只有當localtion1 == comparand
的時候才會將value
賦值給localtion1
,另外吧localtion1
的原始值返回出來,這些操作都是原子性的。
// localtion1 需要比較的引用A
// value 計劃給引用A 賦的值
// comparand 和引用A比較的引用
public static T CompareExchange<T> (ref T location1, T value, T comparand)
where T : class;
一個簡單的流程如下所示:
簡單的使用代碼如下所示:
var a = 1;
// a == 1的話就將其置為0
// 判斷是否成功就看返回的值是否為a的原始值
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
Console.WriteLine("1.成功");
// 現在a已經變為0 這個交換不會成功
if(Interlocked.CompareExchange(ref a, 0, 1) == 1)
Console.WriteLine("2.成功");
結果如下所示,只有當a
的原始值為1
的時候,才會交換成功:
那麼Interlocked.CompareExchange
是如何做到原子性的?在多核CPU中,數據可能在內存或者L1、L2、L3中(如下圖所示),我們如何保證能原子性的對某個數據進行操作?
實際上這是CPU提供的功能,如果查看過JIT編譯的結果,可以看到CompareExchange
是由一條叫lock cmpxchgl
的彙編指令支撐的。
其中lock
是一個指令前綴,彙編指令被lock
修飾後會成為”原子的”,lock
指令有兩種實現方法:
- 早期 – Pentium時代(鎖總線),在Pentium及之前的處理器中,帶有
lock
前綴的指令在執行期間會鎖住總線,使得其它處理器暫時無法通過總線訪問內存,很顯然,這個開銷很大。 - 現在 – P6以後時代(鎖緩存),在新的處理器中,Intel使用緩存鎖定來保證指令執行的原子性,緩存鎖定將大大降低lock前綴指令的執行開銷。
現在這裡的鎖緩存(Cache Locking)就是用了Ringbus + MESI協議。
MESI
協議是 Cacheline 四種狀態的首字母的縮寫,分別是修改(Modified)態、獨佔(Exclusive)態、共享(Shared)態和失效(Invalid)態。 Cache 中緩存的每個 Cache Line 都必須是這四種狀態中的一種。
修改態(Modified),如果該 Cache Line 在多個 Cache 中都有備份,那麼只有一個備份能處於這種狀態,並且「dirty」標誌位被置上。擁有修改態 Cache Line 的 Cache 需要在某個合適的時候把該 Cache Line 寫回到內存中。但是在寫回之前,任何處理器對該 Cache Line在內存中相對應的內存塊都不能進行讀操作。 Cache Line 被寫回到內存中之後,其狀態就由修改態變為共享態。
獨佔態(Exclusive),和修改狀態一樣,如果該 Cache Line 在多個 Cache 中都有備份,那麼只有一個備份能處於這種狀態,但是「dirty」標誌位沒有置上,因為它是和主內存內容保持一致的一份拷貝。如果產生一個讀請求,它就可以在任何時候變成共享態。相應地,如果產生了一個寫請求,它就可以在任何時候變成修改態。
共享態(Shared),意味着該 Cache Line 可能在多個 Cache 中都有備份,並且是相同的狀態,它是和內存內容保持一致的一份拷貝,而且可以在任何時候都變成其他三種狀態。
失效態(Invalid),該 Cache Line 要麼已經不在 Cache 中,要麼它的內容已經過時。一旦某個Cache Line 被標記為失效,那它就被當作從來沒被加載到 Cache 中。
總得來說,若干個CPU核心通過Ringbus連到一起。每個核心都維護自己的Cache的狀態。如果對於同一份內存數據在多個核里都有Cache,則狀態都為S(Shared)。
一旦有一核心改了這個數據(狀態變成了M),其他核心就能瞬間通過Ringbus感知到這個修改,從而把自己的Cache狀態變成I(Invalid),並且從標記為M的Cache中讀過來。同時,這個數據會被原子的寫回到主存。最終,Cache的狀態又會變為S。
關於MESI
協議更詳細的信息就不在本文中介紹了,在計算機操作系統和體系結構相關書籍和資料中有更詳細的介紹。
然後compxchg
這個指令就很簡單了,和我們之前提到的一樣,比較兩個地址中的值是否相等,如果相等的話那麼就修改。
Interlocked
類中的其它方法也是同樣的原理,我們可以看看Add
之類的方法,同樣是在對應的操作指令前加了lock
指令。
總結
本文主要是帶大家看了下ObjectPool
的源碼,然後看了看ObjectPool
能實現無鎖線程安全的最大功臣Interlocked.CompareExchange
方法;然後通過彙編代碼了解了一下Interlocked
類中的一些方法是如何做到原子性的。
感謝閱讀,如果您覺得本文還不錯,歡迎點贊、轉發+評論,您的支持是我更新的動力!