從執行上下文角度重新理解.NET(Core)的多執行緒編程[1]:基於調用鏈的」參數」傳遞
- 2020 年 11 月 25 日
- 筆記
- [02] 編程技巧, AsyncLocal, CallContext, LogicalCallContext, TheadLocal, Thread, ThreadStaticAttribute
執行緒是作業系統能夠進行運算調度的最小單位,作業系統執行緒進一步被封裝成託管的Thread對象,手工創建並管理Thread對象已經成為了所能做到的對執行緒最細粒度的控制了。後來我們有了ThreadPool,可以更加方便地以池化的方式來使用執行緒。最後,Task誕生,它結合async/await關鍵字給與我們完美非同步編程模式。但這一切讓我們的編程體驗越來越好,但是離執行緒的本質越來越遠。被系列文章從「執行上下文傳播」這個令開發者相對熟悉的角度來聊聊重新認識我們似乎已經很熟悉的主題。
目錄
一、ThreadStatic欄位或者ThreadLocal<T>對象
二、CallContext
三、支援跨執行緒傳遞嗎?
四、IllogicalCallContext和LogicalCallContext
五、AsyncLocal<T>
一、ThreadStatic欄位或者ThreadLocal<T>對象
本篇文章旨在解決一個問題:對於一個由多個方法組成的調用鏈,數據如何在上下遊方法之間傳遞。我想很多人首先想到的就是通過方法的參數進行傳遞,但是作為方法簽名重要組成部分的參數列表代表一種「契約」,往往是不能輕易更改的。既然不能通過參數直接進行傳遞,那麼我們需要一個「共享」的數據容器,上遊方法將需要傳遞的數據放到這個容器中,下遊方法在使用的時候從該容器中將所需的數據提取出來。
那麼這個共享的容器可以是一個靜態欄位,當然不行, 因為類型的靜態欄位類似於一個單例對象,它會被多個並發執行的調用鏈共享。雖然普通的靜態欄位不行,但是標註了ThreadStaticAttribute特性的靜態欄位則可以,因為這樣的欄位是執行緒獨享的。為了方便演示,我們定義了如下一個CallStackContext類型來表示基於某個調用鏈的上下文,這是一個字典,用於存放任何需要傳遞的數據。自增的TraceId欄位程式碼當前調用鏈的唯一標識。當前的CallStackContext上下文通過靜態屬性Current獲取,可以看出它返回標註了ThreadStaticAttribute特性的靜態欄位_current。
public class CallStackContext : Dictionary<string, object> { [ThreadStatic] private static CallStackContext _current; private static int _traceId = 0; public static CallStackContext Current { get => _current; set => _current = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
我們通過如下這個CallStack對象創建一個「邏輯」上的調用鏈。在初始化的時候,CallStack會創建一個CallStackContext對象並將其放進CallContext對象並對靜態欄位_current進行複製。該欄位會在Dispose方法中被置空,此時標誌邏輯調用鏈生命周期的終止。
public class CallStack : IDisposable { public CallStack() => CallStackContext.Current = new CallStackContext(); public void Dispose() => CallStackContext.Current = null; }
我們通過如下的程式來演示針對CallStack和CallStackContext的使用。如程式碼片段所示,我們利用對象池並發調用Call方法。Call方法內部會依次調用Foo、Bar和Baz三個方法,需要傳遞的數據體現為一個Guid,我們將當存放在當前CallStackContext中。整個方法Call方法的操作均在創建Callback的using block中執行。
class Program { static void Main() { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); } static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); Bar(); Baz(); } } static void Foo() => Trace(); static void Bar() => Trace(); static void Baz() => Trace(); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }
為了驗證三個方法獲取的數據是否正確,我們讓它們調用同一個Trace方法,該方法會在控制台上列印出當前執行緒ID、調用鏈標識(TraceId)、方法名和獲取到的數據。如下所示的是該演示程式執行後的結果,可以看出置於CallContext中的CallStackContext對象幫助我們很好地完成了針對調用鏈的數據傳遞。
既然我們可以使用ThreadStatic靜態欄位,自然也可以使用ThreadLocal<T>對象來代替。如果希望時候後者,我們只需要將CallStackContext改寫成如下的形式即可。
public class CallStackContext : Dictionary<string, object> { private static ThreadLocal<CallStackContext> _current = new ThreadLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current { get => _current.Value; set => _current.Value = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
二、CallContext
除使用ThreadStatic欄位來傳遞調用鏈數據之外,我們還可以使用CallContext。顧名思義,CallContext是專門為調用鏈創建的上下文,我們首先利用它來實現基於調用鏈的數據傳遞。如果採用這種解決方案,上述的CallStack和CallStackContext類型可以改寫成如下的形式。如程式碼片段所示,當前的CallStackContext上下文通過靜態屬性Current獲取,可以看出它是通過調用CallContext的靜態方法GetData提取的,傳入的類型名稱作為存放「插槽」的名稱。在初始化的時候,CallStack會創建一個CallStackContext對象並將其放進CallContext對應存儲插槽中作為當前上下文,該插槽會在Dispose方法中被釋放
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
public class CallStack : IDisposable { public CallStack() => CallContext.SetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
三、支援跨執行緒傳遞嗎?
對於上面演示的實例來說,調用鏈中的三個方法(Foo、Bar和Baz)均是在同一個執行緒中執行的,如果出現了跨執行緒調用,CallContext是否還能幫助我們實現上下文的快執行緒傳遞嗎?為了驗證CallContext跨執行緒傳遞的能力,我們將Call方法改寫成如下的形式:Call方法直接調用Foo方法,但是Foo方法針對Bar方法的調用,以及Bar方法針對Baz方法的調用均在一個新創建的執行緒中進行的。
static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); } } static void Foo() { Trace(); new Thread(Bar).Start(); } static void Bar() { Trace(); new Thread(Baz).Start(); } static void Baz() => Trace();
再次執行我們我們的程式,不論是採用基於ThreadStatic靜態欄位,還是採用ThreadLocal<T>對象或者CallContext的解決方法,均會得到如下所示的輸出結果。可以看出設置的數據只能在Foo方法中獲取到,但是並沒有自動傳遞到非同步執行的Bar和Baz方法中。
四、IllogicalCallContext和LogicalCallContext
其實CallContext設置的上下文對象分為IllogicalCallContext和LogicalCallContext兩種類型,調用SetData設置的是IllogicalCallContext,它並不具有跨執行緒傳播的能力。如果希望在進行非同步調用的時候自動傳遞到目標執行緒,必須調用CallContext的LogicalSetData方法設置為LogicalCallContext。所以我們應該將CallStack類型進行如下的改寫。
public class CallStack : IDisposable { public CallStack() => CallContext.LogicalSetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
與之相對,獲取LogicalCallContext對象的方法也得換成LogicalGetData,為此我們將CallStackContext改寫成如下的形式。
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.LogicalGetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
再次執行我們程式,依然能夠得到希望的結果。
除了將設置和提取當前CallStackContext的方式進行修改(GetData=>LogicalGet; SetData=>LogicalSetData)之外,我們還有另一個解決方案,那就是讓放存放在CallContext存儲槽的數據類型實現ILogicalThreadAffinative介面。該介面沒有定義任何成員,實現類型對應的對象將自動視為LogicalCallContext。對於我們的演示實例來說,我們只需要讓CallStackContext實現該介面就可以了。
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
五、AsyncLocal<T>
CallContext並沒有被.NET Core繼承下來。也就是,只有.NET Framework才提供針對CallContext的支援,.因為我們有更好的選擇,那就是AsyncLocal<T>。如果使用AsyncLocal<T>作為存放調用鏈上下文的容器,我們的
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { internal static readonly AsyncLocal<CallStackContext> _contextAccessor = new AsyncLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current => _contextAccessor.Value; public long TraceId { get; } = Interlocked.Increment(ref _traceId); } public class CallStack : IDisposable { public CallStack() => CallStackContext._contextAccessor.Value = new CallStackContext(); public void Dispose() => CallStackContext._contextAccessor.Value = null; }
既然命名為AsyncLocal<T>,自然是支援非同步調用。它不僅支援上面演示的直接創建執行緒的方式,最主要的是支援我們熟悉的await的方式(如下所示)。
class Program { static async Task Main(string[] args) { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); Console.Read(); async Task Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); await FooAsync(); await BarAsync(); await BazAsync(); } } } static Task FooAsync() => Task.Run(() => Trace()); static Task BarAsync() => Task.Run(() => Trace()); static Task BazAsync() => Task.Run(() => Trace()); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }