一文讀懂 .NET 中的高性能隊列 Channel
- 2021 年 7 月 16 日
- 筆記
介紹
System.Threading.Channels 是.NET Core 3.0 後推出的新的集合類型, 具有異步API,高性能,線程安全等特點,它可以用來做消息隊列,進行數據的生產和消費, 公開的 Writer
和 Reader
api對應消息的生產者和消費者,也讓Channel更加的簡潔和易用,與Rabbit MQ 等其他隊列不同的是,Channel 是進程內的隊列。
開始Channel之旅
創建一個 channel 非常簡單,Channel 類公開的API支持創建無限容量和有限容量的 channel
// 創建有限容量的channel
var channel = Channel.CreateBounded<string>(100);
// 創建無限容量的channel
var channel = Channel.CreateUnbounded<string>();
這裡需要注意的是,當你使用一個有限容量的 Channel 時,你需要指定容量的大小,還可以指定一個 BoundedChannelFullMode
的枚舉類型,來告訴 channel 達到容量限制的時候,繼續寫入時應該怎麼處理
public enum BoundedChannelFullMode
{
Wait,
DropNewest,
DropOldest,
DropWrite
}
- Wait 是默認值,當 channel 容量滿了以後,寫入數據時會返回 false,直到channel有數據被消費了以後,才可以繼續寫入
- DropNewest 移除最新的數據,也就是從隊列尾部開始移除
- DropOldest 移除最老的數據,也就是從隊列頭部開始移除
- DropWrite 寫入數據返回成功,但是轉頭就把剛才的數據丟了
// 創建有限容量的channel, 並指定容量達到最大的策略
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
});
生產數據
生產數據主要通過 Channel 提供的 Writer
api, 常規的寫入操作如下:
await channel.Writer.WriteAsync("hello");
Channel 還提供了 TryWrite()
方法,如果寫入數據失敗時會返回 false,WaitToWriteAsync()
方法會做非阻塞的等待,直到 Channel 允許寫入新的數據時返回 true,同樣的 Channel 關閉後會返回 false,翻了一下源碼發現,WriteAsync()
方法內部其實是調用了 TryWrite()
和 WaitToWriteAsync()
方法。
消費數據
消費數據主要通過 Channel 提供的 Reader
api, 常規的讀取操作如下:
var item = await channel.Reader.ReadAsync();
同樣的,Channel 提供了 TryRead()
嘗試讀取數據,WaitToReadAsync()
方法會做非阻塞的等待,直到 Channel 可以讀取到數據時會返回 true,在 Channel 關閉後會返回 false, ReadAsync()
的方法內部其實是調用了 TryRead()
和 WaitToReadAsync()
方法, 另外你可以通過 channel.Reader.Count
獲取隊列元素的數量。
在實際的使用場景中,可能需要一些後台任務,長時間的進行消費,那麼你可以使用下邊的方式
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
Console.WriteLine(item);
}
}
ReadAllAsync()
方法返回的是一個 IAsyncEnumerable<T>
對象,也可以用 await foreach
的方式來獲取數據
await foreach(var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
單一生產者和消費者
創建 Channel 時,可以設置 ChannelOptions 的 SingleWriter
和 SingleReader
,來指定 Channel 時單一的生產者和消費者,默認都是 false,當設置了 SingleWriter = true 時, 會限制同一個時間只能有一個生產者可以寫入數據, SingleReader = true 是同樣的。
另外,如果只需要一個消費者的話,你應該設置 SingleReader = true
, Channel 在內部做了一些優化,在讀取時避免了鎖操作,性能上有些許的提升。
性能
這裡的基準測試我對比了三種類型,Channel, BufferBlock, BlockingCollection,分別寫入了10000條數據,然後進行讀取,發現 Channel 確實是表現比較好。
總結
Channel 實際上還是使用 ConcurrentQueue
做的封裝, 使用起來更方便,對異步更友好,另外,.NET 5 其中的 Quic 內部就使用了Channel,CAP 也在新版本中使用 Channel 替換掉了之前的 BlockingCollection,來實現進程內的隊列。
官方介紹 //devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels
CAP //github.com/dotnetcore/CAP
Quic //github.com/dotnet/runtime/tree/main/src/libraries/System.Net.Quic