在.NET Core中使用Channel(一)
- 2021 年 1 月 8 日
- 筆記
我最近一直在熟悉.net Core中引入的新Channel<T>類型。我想在它第一次發布的時候我了解過它,但是有關文章非常非常少,我不能理解它們與其他隊列有什麼不同。
在使用了一段時間後,我終於看到了它們的吸引力和真正的力量。最值得注意的是大型非同步後台操作,這些操作幾乎需要雙向通訊來同步它們正在做的事情。這句話有點拗口,但希望在本系列文章結束時,你會清楚什麼時候應該使用Channel<T>,什麼時候應該使用一些更基本的東西,比如Queue<T>。
Channel是什麼?
從核心來說,Channel本質上是.net中的一種新的集合類型,它與現有的Queue<T>類型非常相似,但有額外的好處。在真正嘗試研究這個主題時,我發現的問題是,許多現有的外部隊列技術(IBM MQ、Rabbit MQ等)都有「channel」的概念,它們的範圍從完全抽象的思維過程,到系統中實際的物理類型。
現在也許我大錯特錯,但如果你認為.net中的Channel就好比是允許等待新消息的一個隊列,並告訴生產者要保持隊列越來越大,消費者無法跟上,我認為這很難出錯。
這裡我提到了一個關鍵詞,生產者/消費者。你可能還聽說過Pub/Sub。但它們是不可互換的。
Pub/Sub描述的是某人發布資訊,一個或多個「訂閱者」監聽該資訊並對其採取一定的響應行為。這裡不存在負載平衡,因為當添加訂閱伺服器時,它們本質上與其他所有人獲得相同消息的副本。
在圖表形式中,Pub/Sub看起來有點像這樣:
生產者/消費者描述生產者發布消息的行為,並且有一個或多個消費者可以對該消息進行操作,但是每個消息只讀取一次。它不會分發到每個訂閱者。
當然,用圖表的形式:
另一種思考生產者/消費者的方式是想像你去超市結賬。當顧客想結帳時,排隊的隊伍變長了,你可以簡單地打開更多的收銀台來處理這些顧客。這個小小的思考過程實際上是很重要的,因為如果你不能打開更多的收銀台怎麼辦?排隊的隊伍應該越來越長嗎?如果收銀台操作員坐在那裡,但沒有顧客怎麼辦?他們是應該當天就打包回家呢,還是應該被告知坐著等客人來了再說。
這通常被稱為生產者-消費者問題,這是Channel要解決的問題。
基礎Channel示例
與Channel有關的所有東西都在System.Threading.Channels中。在以後的版本中,這似乎是與標準的.net Core項目捆綁在一起的,但如果不是,這裡有一個nuget包://www.nuget.org/packages/System.Threading.Channels。
一個極其簡單的Channel示例是這樣的:
static async Task Main(string[] args) { var myChannel = Channel.CreateUnbounded(); for (int i = 0; i < 10; i++) { await myChannel.Writer.WriteAsync(i); } while (true) { var item = await myChannel.Reader.ReadAsync(); Console.WriteLine(item); } }
這裡沒有太多可談的。我們創建了一個「無限的」通道(這意味著它可以容納無限項,但在本系列的後續內容中會有更多內容)。我們寫10項,讀10項,在這一點上,它與我們在.net中見過的任何其他隊列沒有太大區別。
Channel是執行緒安全的
沒錯,通道是執行緒安全的。這意味著多個執行緒可以讀寫同一個通道而不會出現問題。如果我們看一下這裡的Channel源程式碼,我們可以看到它是執行緒安全的,因為它使用鎖和內部「隊列」的組合來同步讀/寫器,一個接一個地讀/寫。
實際上,Channel的預期用例是多執行緒場景。例如,如果我們使用上面的基本程式碼,當我們實際上不需要執行緒安全性時,維護執行緒安全性實際上會有一些開銷。所以在那個例子中,我們可能只使用Queue<T>更好。但是這段程式碼呢?
static async Task Main(string[] args) { var myChannel = Channel.CreateUnbounded(); _ = Task.Factory.StartNew(async () => { for (int i = 0; i < 10; i++) { await myChannel.Writer.WriteAsync(i); await Task.Delay(1000); } }); while (true) { var item = await myChannel.Reader.ReadAsync(); Console.WriteLine(item); } }
在這裡,我們有一個單獨的執行緒寫入消息,而我們的主執行緒讀取消息。你會注意到有趣的事情是,我們添加了消息之間的延遲。怎麼能調用ReadAsync()?沒有TryDequeue或Dequeue,如果隊列中沒有消息,它就運行null,對嗎?
答案是Channel Reader的「ReadAsync()」方法實際上會「等待」一個消息。因此,不需要在等待消息時執行一些荒謬的循環,也不需要在等待時完全阻塞執行緒。我們將在以後的文章中進一步討論這個問題,但是你要知道你可以使用ReadAsync來等待新的消息,而不是編寫一些自定義的程式碼來做同樣的事情。
接下來是什麼?
現在你已經掌握了基礎知識,下一篇讓我們看看使用Channel一些更高級的場景。
歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。
原文鏈接://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-1-getting-started/