多執行緒合集(二)—非同步的那些事,async和await原理拋析

引言

       在c#中,非同步的async和await原理,以及運行機制,可以說是老生常談,經常在各個群里看到有在討論這個的,而且網上看到的也只是對非同步狀態機的一些講解,甚至很多人說非同步狀態機的時候,他們說的是在運行時去構建狀態機對執行緒狀態進行調度,實際上非同步狀態機是屬於編譯期間,通過生成dll,然後我們使用反編譯工具查看,是可以看到IL構建了非同步狀態機,並且在運行時添加了兩個特性,其中比較重要的是AsyncStateMachine特性這個特性接受的是一個type類型的參數,即指定用的是哪一個非同步狀態機。所以在寫多執行緒的時候,前面第一篇主要寫執行緒方面的一些具體的使用,以及實現自定義的一些操作,接下來的這篇可能會注重原理方面的講解,以及結合一些程式碼實現自定義狀態機。

Part 1

       在c#中,有的關鍵字的使用實際上是由對應的類去進行封裝的,那例如Lock關鍵字,是基於Monitor的Enter和Exit兩個方法進行封裝的,那對應的async和await關鍵字也是有對應的類或者結構體或者介面去進行封裝的,上篇文章中,我們寫了自定義的await,可以看到實際上await關鍵字的限制就是必須繼承ICriticalNotifyCompletion, INotifyCompletion這兩個介面,然後必須實現它介面的方法,這裡有個缺陷就是,await自定義是必須有實現GetResult的方法的這個方法,但是實現那兩個介面是沒有這個方法的,所以GetResult方法必須是自己手動去實現,返回值的話可以根據自己的情況去寫,可以是泛型T  也可以是void類型,然後需要實現一個拓展方法,拓展方法返回類型是你自定義的await,拓展方法是你需要使用await關鍵字的具體類型;那對應的async的關鍵字,也是有一個結構體進行封裝的AsyncTaskMethodBuilder這個結構體是一個泛型,也有一個不是泛型的,這個可以對標你的自定義await 如果你的await是有返回值的是泛型的,那這個builder也必須是泛型,對標你的返回值類型。

        public CustomAwaiter(Func<int, int, string> obj)
        {
            Obj = obj;
        }
        private bool bIsFinesh;
        private Timer Timer { get; set; }
        public bool IsCompleted
        {
            get { return bIsFinesh; }
        }
        private SpinLock SpinLock = new SpinLock();
        private string Result { get; set; }
        public Func<int, int, string> Obj { get; }

        public void OnCompleted(Action continuation)
        {
            Timer = new Timer(s => {
                var action = s as Action;
                var bIsEnter = false;
                SpinLock.TryEnter(ref bIsEnter);
                if (bIsEnter)
                {
                    Result = Obj.Invoke(5, 10);
                    SpinLock.Exit(false);
                }
                Thread.Sleep(5000);
                action?.Invoke();
                bIsFinesh = true;

            }, continuation, 0, int.MaxValue);
        }

        public void UnsafeOnCompleted(Action continuation)
        {
            Timer = new Timer(s => {
                var action = s as Action;
                var bIsEnter = false;
                SpinLock.TryEnter(ref bIsEnter);
                if (bIsEnter)
                {
                    Result = Obj.Invoke(5, 10);
                    SpinLock.Exit(false);
                }
                action?.Invoke();
                bIsFinesh = true;
            }, continuation, 5000, int.MaxValue);
        }
        public string GetResult()
        {
            return Result;
        }
 public static CustomAwaiter GetAwaiter(this Func<int, int, string> obj)
        {
            return new CustomAwaiter(obj);
        }

Part 2

       在第一部分中,我們找到了async 和await對應的結構體以及介面,那我們接下來看看實際上的非同步的運行方式,下面這一段程式碼相信大家看起來很熟悉,感覺似曾相識,實際上非同步方法加上async和await關鍵字的時候生成的IL程式碼轉為c#程式碼基本上就是這個樣子的。

GetResult方法,去調用非同步狀態機

       可以看到我們在這裡定義了一個方法GetResult,這裡面去執行一個非同步狀態機,這裡可以看看自定義狀態機的程式碼,實現了IAsyncStateMachine這個介面,重寫了MoveNext的方法和SetStateMachine的兩個方法,這裡著重講解MoveNext方法,在c#非同步中,都是使用MoveNext方法來進行調度,通過定義的State來判斷執行那一步,結合第一段程式碼片段,可以看到我們剛開始的時候設置的狀態是-1,然後調用了Builder的Start方法,這個方法需要傳入一個狀態機的參數,所以我們傳入我們自定義的狀態機,

  public static Task<string> GetResult()
        {
            CustomAsyncStateMechine customAsyncStateMechine = new CustomAsyncStateMechine();
            customAsyncStateMechine.builder = AsyncTaskMethodBuilder<string>.Create();
            customAsyncStateMechine.State = -1;
            customAsyncStateMechine.builder.Start(ref customAsyncStateMechine);
            return customAsyncStateMechine.builder.Task;
        }

需要執行的非同步方法

 public async static Task<string> Tests()
        {
            return await  Task.Run(() => {
                return "hELLO";
            });
        }

 

自定義非同步狀態機

  public class CustomAsyncStateMechine : IAsyncStateMachine
    {
        public AsyncTaskMethodBuilder<string> builder;
        public TaskAwaiter<string> awaiter;
        public int State;
        public void MoveNext()
        {
            TaskAwaiter<string> taskAwaiter=default;
            int num = State;
            CustomAsyncStateMechine state;
            string Result = string.Empty;

            switch (num)
            {
                case -1:
                    taskAwaiter = Program.Tests().GetAwaiter();
                    if (!taskAwaiter.IsCompleted)
                    {
                        num = State = 0;
                        awaiter = taskAwaiter;
                        state = this;
                        builder.AwaitUnsafeOnCompleted(ref taskAwaiter, ref state);
                        return;
                    }
                    break;
                case 0:

                    taskAwaiter = awaiter;
                    awaiter = default(TaskAwaiter<string>);
                    num = State = -1;
                    break;

            }
            Result = taskAwaiter.GetResult();
            builder.SetResult(Result);
        }

        public void SetStateMachine(IAsyncStateMachine stateMachine)
        {

        }
    }

 

Start方法

       可以在下面的程式碼段看到Start的方法程式碼,在我們調用了這個方法之後會構建一個用於切換執行緒上下文的對象,然後調用執行緒上下文的方法去進行一些操作,這裡看一下,這個方法調用了狀態機的MoveNext方法,這是第一次執行MoveNext的方法,可以看到我們第一次執行MoveNext方法的時候我們去獲取了一下Tests的GetAwaiter,獲取的時候實際上這個Tests方法已經執行了,然後我們去判斷是否完成,如果沒有完成,我們需要去進行下一步操作,在全局變數定義一個Awaiter,需要將Tests的Awaiter保存起來,然後切換State的狀態推進到下一步,然後我們調用了Builder的AwaitUnsafeOnCompleted這個方法,

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
        {
            // See comment on AsyncMethodBuilderCore.Start
            // AsyncMethodBuilderCore.Start(ref stateMachine);
 
            if (stateMachine == null) throw new ArgumentNullException("stateMachine");
            Contract.EndContractBlock();
 
            // Run the MoveNext method within a copy-on-write ExecutionContext scope.
            // This allows us to undo any ExecutionContext changes made in MoveNext,
            // so that they won't "leak" out of the first await.
 
            ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
            RuntimeHelpers.PrepareConstrainedRegions();
            try
            {
                ExecutionContext.EstablishCopyOnWriteScope(ref ecs);
                stateMachine.MoveNext();
            }
            finally
            {
                ecs.Undo();
            }
        }

AwaitUnsafeOnCompleted方法

       可以看到這個方法內部有調用了一個GetCompletionAction方法

 public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
            ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : ICriticalNotifyCompletion
            where TStateMachine : IAsyncStateMachine
        {
            try
            {
                AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize = null;
                var continuation = m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runnerToInitialize);
                Contract.Assert(continuation != null, "GetCompletionAction should always return a valid action.");
 
                // If this is our first await, such that we've not yet boxed the state machine, do so now.
                if (m_coreState.m_stateMachine == null)
                {
                    // Force the Task to be initialized prior to the first suspending await so 
                    // that the original stack-based builder has a reference to the right Task.
                    var builtTask = this.Task;
 
                    // Box the state machine, then tell the boxed instance to call back into its own builder,
                    // so we can cache the boxed reference.
                    Contract.Assert(!Object.ReferenceEquals((object)stateMachine, (object)stateMachine), "Expected an unboxed state machine reference");
                    m_coreState.PostBoxInitialization(stateMachine, runnerToInitialize, builtTask);
                }
 
                awaiter.UnsafeOnCompleted(continuation);
            }
            catch (Exception e)
            {
                AsyncMethodBuilderCore.ThrowAsync(e, targetContext: null);
            }
        }

 

GetCompletionAction方法

       這裡我們著重看一下runner.run方法可以看到Run方法裡面不管是怎麼去進行操作,最後都是要去執行MoveNext方法,接下來看一下上面的AwaitUnsafeOnCompleted方法,還記得上一篇文章中,我賣了一個關子,詢問大家OnCompleted和UnsafeOnCompleted方法裡面的Action是哪一個方法,現在已經很明了了,這個Action執行的是狀態機的MoveNext方法,它是在Task完成之後,去執行OnCompleted和UnSafeOnCompleted方法的,這裡為了方便大家理解,需要結合上一篇文章中自定義任務調度TaskScheduler去給大家演示,最好是希望閱讀文章後去下載最新的程式碼進行調試就會很明白,

 internal Action GetCompletionAction(Task taskForTracing, ref MoveNextRunner runnerToInitialize)
        {
            Contract.Assert(m_defaultContextAction == null || m_stateMachine != null,
                "Expected non-null m_stateMachine on non-null m_defaultContextAction");
 
            // Alert a listening debugger that we can't make forward progress unless it slips threads.
            // If we don't do this, and a method that uses "await foo;" is invoked through funceval,
            // we could end up hooking up a callback to push forward the async method's state machine,
            // the debugger would then abort the funceval after it takes too long, and then continuing
            // execution could result in another callback being hooked up.  At that point we have
            // multiple callbacks registered to push the state machine, which could result in bad behavior.
            Debugger.NotifyOfCrossThreadDependency();
 
            // The builder needs to flow ExecutionContext, so capture it.
            var capturedContext = ExecutionContext.FastCapture(); // ok to use FastCapture as we haven't made any permission demands/asserts
 
            // If the ExecutionContext is the default context, try to use a cached delegate, creating one if necessary.
            Action action;
            MoveNextRunner runner;
            if (capturedContext != null && capturedContext.IsPreAllocatedDefault)
            {
                // Get the cached delegate, and if it's non-null, return it.
                action = m_defaultContextAction;
                if (action != null)
                {
                    Contract.Assert(m_stateMachine != null, "If the delegate was set, the state machine should have been as well.");
                    return action;
                }
 
                // There wasn't a cached delegate, so create one and cache it.
                // The delegate won't be usable until we set the MoveNextRunner's target state machine.
                runner = new MoveNextRunner(capturedContext, m_stateMachine);
 
                action = new Action(runner.Run);
                if (taskForTracing != null)
                {
                    m_defaultContextAction = action = OutputAsyncCausalityEvents(taskForTracing, action);
                }
                else
                {
                    m_defaultContextAction = action;
                }
            }
            // Otherwise, create an Action that flows this context.  The context may be null.
            // The delegate won't be usable until we set the MoveNextRunner's target state machine.
            else
            {
                runner = new MoveNextRunner(capturedContext, m_stateMachine);
                action = new Action(runner.Run);
 
                if (taskForTracing != null)
                {
                    action = OutputAsyncCausalityEvents(taskForTracing, action);
                }
 
                // NOTE: If capturedContext is null, we could create the Action to point directly
                // to m_stateMachine.MoveNext.  However, that follows a much more expensive
                // delegate creation path.
            }
 
            if (m_stateMachine == null)
                runnerToInitialize = runner;
 
            return action;
        }
     internal void Run()
            {
                Contract.Assert(m_stateMachine != null, "The state machine must have been set before calling Run.");
 
                if (m_context != null)
                {
                    try
                    {
                        // Get the callback, lazily initializing it as necessary
                        ContextCallback callback = s_invokeMoveNext;
                        if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }
 
                        // Use the context and callback to invoke m_stateMachine.MoveNext.
                        ExecutionContext.Run(m_context, callback, m_stateMachine, preserveSyncCtx: true);
                    }
                    finally { m_context.Dispose(); }
                }
                else
                {
                    m_stateMachine.MoveNext();
                }
            }
 
            /// <summary>Cached delegate used with ExecutionContext.Run.</summary>
            [SecurityCritical]
            private static ContextCallback s_invokeMoveNext; // lazily-initialized due to SecurityCritical attribution
 
            /// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
            /// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
            [SecurityCritical] // necessary for ContextCallback in CoreCLR
            private static void InvokeMoveNext(object stateMachine)
            {
                ((IAsyncStateMachine)stateMachine).MoveNext();
            }

CustomScheduler 和CustomAwaiter 以及自定義狀態機的結合使用,

 foreach (var item in Enumerable.Range(0, 1))
                {
                   await Task.Run(async () =>
                    {
                        var i = item;
                        var ts = new Func<int, int, string>((s, b) =>
                        {
                            return Guid.NewGuid().ToString();
                        });
                        //var t= await ts;
                        var tash = new TaskCustomScheduler();
                        var factory = new TaskFactory(tash);
                        await factory.StartNew(async () =>
                         {
                             var state = new CustomAsyncStateMechines();
                             state.State = -1;
                             state.awaiter = ts.GetAwaiter();
                             state.builder = AsyncTaskMethodBuilder<string>.Create();
                             state.builder.Start(ref state);
                             var result = await state.builder.Task;
                             Console.WriteLine(result);
                         });
                    });
                }

       在上一篇文章中,我們講解了自定義調度的幾個比較重要的方法,我們在使用factory去進行指定了調度器之後,調用了StartNew方法,去執行一段程式碼,這裡的是,實際上在StartNew執行之前,他會先咋自定義任務調度裡面添加Task,他會走到QueueTask將Task添加到自己定義的任務池裡面去,然後再去RunWork,去通過ThreadPool去執行Task,TryExecuteTask是抽象類提供且內部實現的一個方法,是去執行Task,然後Task執行結束後,我們把它從任務調度池裡面移除,那Task結束之後,就會走到自定義Awaiter裡面UnSafeOnCompleted方法裡面去,然後在這裡面再去寫執行完成的回調,將狀態機向前推進,然後在Movenext方法裡面,我們在去獲取awaiter的結果,這裡就是剛開始所說的就是自定義Awaiter需要自己寫的GetResult方法,然後獲取到結果之後,我們需要將結果賦值到Task中,就需要調用builder的SetResult方法,實際上對於Task的異常處理也是有SetException方法去進行設置異常的,就需要在MoveNext方法中添加Try Catch  然後捕獲之後去迪奧用SetException方法設置異常,這就是async和await非同步執行的相關過程,對於內部更深層次的,我目前也是一知半解,但是大體意思都是知道。

 

    public class TaskCustomScheduler : TaskScheduler
    {
        private SpinLock SpinLock = new SpinLock();
        public TaskCustomScheduler()
        {

        }
        private ConcurrentQueue<Task> Tasks = new ConcurrentQueue<Task>();
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return Tasks.ToList();
        }

        protected override void QueueTask(Task task)
        {
            Tasks.Enqueue(task);
            RunWork();
        }

        protected override bool TryDequeue(Task task)
        {
           return Tasks.TryDequeue(out task);
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return TryExecuteTask(task);
        }
        private void RunWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                try
                {
                    foreach (var item in Tasks)
                    {
                        var task = item;
                        var isEnter = false;
                        SpinLock.TryEnter(ref isEnter);
                        TryExecuteTask(task);
                        if (isEnter)
                        {
                            Tasks.TryDequeue(out task);
                            SpinLock.Exit(false);
                        }
                    }
                }
                finally {  }
            }, null);
        }
    }

 

Part 2

       c#中,實際上所有的Task都是基於ThreadPoolScheduler去進行運行的,這個類開發者是沒有辦法去new的,但是在TaskScheduler中有一個屬性Default實際上它返回的就是這個類,然後Task的時候都是運行在這個類上面,由這個類去進行調度,至於有的人說非同步多執行緒,有的時候非同步是多執行緒有的時候不是多執行緒,在這裡,可以肯定的是async和await的非同步是多執行緒的,但是對於一些類提供的Begin開頭的非同步,這種的 ,我的觀點是,不是多執行緒的,如果我說的不對的話,希望各位大佬能夠進行指正,程式碼的話,我會放在Gitee裡面去,家裡的網路上不去Github。抱歉,

總結

       多執行緒方面的文章就講解到這裡,後續可能會出一些,winform方面自繪或者Net Core自定義配置結合Options進行的自定義,敬請各位大佬進行關注,如果對文章或者程式碼有不懂的地方,可以看自己所在的群里有沒有叫四川觀察的,那基本上就是我了,或者加QQ群6406277,找我也可以,在這裡,謝謝大家的支援,以後會多發表開發方面的知識,大家一起學習,一起進步。

 

Tags: