C# 8.0 寶藏好物 Async streams

之前寫《.NET gRPC 核心功能初體驗》,利用gRPC雙向流做了一個打乒乓的Demo,存儲消息的對象是IAsyncEnumerable<T>,這個非同步可枚舉泛型介面支撐了gRPC的實時流式通訊。

本文我將回顧分享

  • foreach/yield return/async await語法糖的本質
  • 如何使用非同步流
  • 消費非同步流時 附加探索

foreach/ yield return/async await的本質

.NET誕生之初,就通過IEnumerable、IEnumerator提供迭代能力,
前者代表具備可枚舉的性質,後者代表可被枚舉的方式。
(看你骨骼驚奇,再送你一本《2021年了,IEnumerableIEnumerator介面還傻傻分不清楚?》)
如果你真的使用強類型IEnumerable/IEnumerator來產生/消費可枚舉類型,會發現要寫很多瑣碎程式碼。

C#推出的yield return迭代器語法糖,簡化了產生可枚舉類型的編寫過程。(編譯器將yield return轉換為狀態機程式碼來實現IEnumerable,IEnumerator)

yield 關鍵字可以執行狀態迭代,並逐個返回枚舉元素,在返回數據時,無需創建臨時集合來存儲數據。

C#foreach語法糖,簡化了消費可枚舉類型的編寫過程。(編譯器將foreach抓換為強類型的方法/屬性調用)

IEnumerable src = ...;
IEnumerator e = src.GetEnumerator();
try
{
  while (e.MoveNext()) Use(e.Current);
}
finally { if (e != null) e.Dispose(); }

.NET Framework4引入Task,.NET Framework 4.5/C#5.0引入了await/async非同步編程語法糖,簡化了非同步編程的編程過程。(編譯器將await/async語法糖轉換為狀態機,產生Task並在內部回調)

☺️以上也看出微軟為幫助我們更快速優雅地編寫程式碼,給了很多糖,編譯器做了很多事情。

C#提供了迭代、非同步的快捷方式,能否將兩者結合?
兩者結合的效果就是: 希望在數據就緒時,接受並處理數據,但不會以阻塞CPU的sing是等待,這在lot流式數據中很常見,

非同步迭代

有一隻爬蟲要通過列表頁上的鏈接,抓取鏈接背後的html內容並顯示。

這是一個[相互獨立的長耗時行為的集合(假設分別耗時5,4,3,2,1s)],
我們使用C#8.0非同步可枚舉類型IAsyncEnumerable,非同步產生/消費枚舉元素。

與同步版本IEmunerable類似,IAsyncEnumerable也有對應的IAsyncEnumerator迭代器,迭代器的實現過程決定了消費的順序。

C#8.0 Asynchronous streams

C#8.0中一個重要的特性是非同步流(async stream), 可以輕鬆創建和消費非同步枚舉。

返回非同步流的方法特徵:

  • async修飾符聲明
  • 返回IAsyncEnumerable<T>對象
  • 方法包含yield return語句,用來非同步持續返回元素
static async Task Main(string[] args)
{
      Console.WriteLine(DateTime.Now + $"\tThreadId:{Thread.CurrentThread.ManagedThreadId}\r\n");

      await foreach (var html in FetchAllHtml())
      {
           Console.WriteLine(DateTime.Now + $"\tThreadId:{Thread.CurrentThread.ManagedThreadId}\t" + $"\toutput:{html}");
      }
      Console.WriteLine("\r\n" + DateTime.Now + $"\tThreadId:{Thread.CurrentThread.ManagedThreadId}\t");
      Console.ReadKey();
 }

 static async IAsyncEnumerable<string> FetchAllHtml()
 {
    for (int i = 5; i >= 1; i--)
    {
        var html = await Task.Delay(i* 1000).ContinueWith((t,i)=> $"html{i}",i);    //  模擬長耗時
        yield return html;
    }
 }

for循環結合yield關鍵字,決定了IAsyncEnymerator的實現;
以上程式碼將使得await foreach消費非同步枚舉時, 採用與for循環一樣的順序,也就是產生非同步任務的先後順序

以上不會等待15s然後一股腦拋出所有數據,而是根據枚舉for循環,一次就緒,依次顯示,總耗時還是15s,只不過每一步都是非同步的。

附加思考:實現一個更有意思的迭代器

☺️ 但是我內心想,能不能按照完成非同步任務的順序,先完成先消費,這難道不是人之常情,交互體驗應該更好。

static async IAsyncEnumerable<string> FetchAllHtml()
{  
    var tasklist= new List<Task<string>>();
    for (int i = 5; i >= 1; i--)
    {
       var t= Task.Delay(i* 1000).ContinueWith((t,i)=>$"html{i}",i);      // 模擬長耗時任務
       tasklist.Add(t);
    }
    while(tasklist.Any())  
    {
      var tFinlish = await Task.WhenAny(tasklist);
      tasklist.Remove(tFinlish); 
      yield return await tFinlish;
    }
}  

上面我先構造了可等待的任務列表,通過Task.WhenAny()按照任務完成的順序 返回迭代。

以上總耗時取決於 耗時最長的那個非同步任務5s.


.NETCore 3.1 已經可以在webapi中使用非同步流,意味著我們可將流式數據返回到HTTP響應。

前端也已經有試驗性的Streams API可以對接消費流式數據。
傳送門: //developer.mozilla.org/en-US/docs/Web/APs_API
瀏覽器兼容列表: //developer.mozilla.org/en-US/docs/Web/API_API#browser_compatibility

對於web應用,這著實能提高 可交互性:
想像之前含多個長耗時行為的列表數據,現在不必等待所有數據,,配以loading,誰家完成誰載入,效果杠杠。