走进Task(2):Task 的回调执行与 await
前言
本文为系列博客
- 什么是 Task
- Task 的回调执行与 await(本文)
- async 到底干了什么(TODO)
- 总结与常见误区(TODO)
上一篇我们讲了对 Task 的基本定义:
Task 代表一个任务,其具体类型可能是多种多样的,且有时候对我们来说完全是个黑盒。这个任务可以有结果,可以没有结果,我们能知道这个任务什么时候执行完成,并进行相应的后续处理。
Task 生命周期可以分为任务执行和回调执行两个主要的阶段。上回讲了 Task 的执行阶段,这次我们就接着来看下回调执行阶段。
Task 将回调函数维护在 m_continuationObject 字段上,并通过 TrySetResult 等方法对外(这个对外仅限runtime里Task相关的其他代码)暴露回调的触发方式。
由于 Task 的设计过于复杂,我的理解可能有错误,以后的版本可能会和现在有所出入。本文仅供参考学习,希望大家不要太过于纠结细节,了解设计思路比实现细节更重要。
class Task
{
// 保存一个或一组回调
private volatile object? m_continuationObject;
internal bool TrySetResult()
{
// ...
FinishContinuations();
// ...
}
internal void FinishContinuations()
{
// 处理回调的执行
}
}
class Task<TResult> : Task
{
internal bool TrySetResult(TResult result)
{
// ...
this.m_result = result;
// 复用父类的逻辑
FinishContinuations();
// ...
}
}
本文要讨论的其实就是对上述的补充:
- Task 在把回调函数保存到 m_continuationObject 之前,对回调函数进行了什么样的包装处理?
- Task 的 回调函数是在什么时候被触发的,也就是 Task 的完成与回调的执行是如何进行衔接的?
- Task 所保存的回调函数会在哪里执行?
Task.ContinueWith
往一个 Task 注册回调,有两种方式:直接调用 Task 实例的 ContinueWith 方法,或者使用 await 关键词。我们先看一下前者,await 放在后面单独讲。
ContinueWith 的产物:ContinuationTask
调用 ContinueWith 本质上是创建了一个新的 Task(后面简称为 ContinuationTask),而这个 ContinuationTask 的执行时间就是 原Task(后面简称为 AntecedentTask) 完成之后。
作为 Task ContinueWith 的返回值的 Task 的子类有以下四个,分别对应四种用法:
- ContinuationTaskFromTask
向 Task 注册一个回调
Task task = Task.Run(() => Console.WriteLine("Hello"))
.ContinueWith(t => Console.WriteLine("World"));
// System.Threading.Tasks.ContinuationTaskFromTask
Console.WriteLine(task.GetType());
- ContinuationResultTaskFromTask<TResult>
向 Task 注册一个回调,并在回调里返回一个新值作为 新Task 的返回值
Task task = Task.Run(() => Console.WriteLine("Hello"))
.ContinueWith(t => "World");
// System.Threading.Tasks.ContinuationResultTaskFromTask`1[System.String]
Console.WriteLine(task.GetType());
- ContinuationTaskFromResultTask<TAntecedentResult>
向 Task<TResult> 注册一个回调, 并且 Task 获取返回值
Task task = Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// System.Threading.Tasks.ContinuationTaskFromResultTask`1[System.String]
Console.WriteLine(task.GetType());
- ContinuationResultTaskFromResultTask<TAntecedentResult, TResult>
向 Task<TResult> 注册一个回调,并在回调里返回一个新值作为 新Task 的返回值
Task task = Task.Run(() => "Hello")
.ContinueWith(t => $"{t.Result} World");
// System.Threading.Tasks.ContinuationResultTaskFromResultTask`2[System.String,System.String]
Console.WriteLine(task.GetType());
因为 Task.ContinueWith 的结果依旧是一个 Task,这个链式的回调注册可以无限地进行。
Task.Run(() => Console.WriteLine(1))
.ContinueWith(t => Console.WriteLine(2))
.ContinueWith(t => Console.WriteLine(3))
.ContinueWith(t => Console.WriteLine(4));
额外的参数
class Task
{
public Task ContinueWith(
Action<Task> continuationAction,
CancellationToken cancellationToken,
TaskContinuationOptions continuationOptions,
TaskScheduler scheduler)
{
// ...
}
}
我们还可以通过 ContinueWith 的重载向其传入回调函数外的三个参数:
- CancellationToken:协作式取消 Task 的执行,本文暂不展开。
- TaskContinuationOptions:
前一部分和 TaskCreationOptions 的值完全一致。
如果设置的是这一部分的值,就会直接转换为 ContinuationTask 的 TaskCreationOptions。TaskScheduler 识别过后进行相应的处理。
如果设置的是后一部分的值,那么 runtime 在决定把 Task 交给 TaskScheduler 去调度执行前,会根据设置的值做相应的预判逻辑。例如 OnlyOnFaulted 代表在 AntecedentTask 执行过程抛出了异常,runtime 才会去执行 ContinuationTask。
public enum TaskCreationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16, // 0x00000010
RunContinuationsAsynchronously = 64, // 0x00000040
}
public enum TaskContinuationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16, // 0x00000010
LazyCancellation = 32, // 0x00000020
RunContinuationsAsynchronously = 64, // 0x00000040
// ---------- 分界线 ----------
NotOnRanToCompletion = 65536, // 0x00010000
NotOnFaulted = 131072, // 0x00020000
NotOnCanceled = 262144, // 0x00040000
OnlyOnRanToCompletion = NotOnCanceled | NotOnFaulted, // 0x00060000
OnlyOnFaulted = NotOnCanceled | NotOnRanToCompletion, // 0x00050000
OnlyOnCanceled = NotOnFaulted | NotOnRanToCompletion, // 0x00030000
ExecuteSynchronously = 524288, // 0x00080000
}
- TaskScheduler:可以之指定 TaskScheduler 去调度 Task。
默认是 TaskScheduler.Current,而 TaskScheduler.Current 的默认值是 ThreadPoolTaskScheduler,可以修改成其他实现。
回调的容器:TaskContinuation
我们注意到 m_continuationObject 字段的类型是 object,而 object 类型在数据的存储上有更多的灵活性。
class Task
{
// 保存一个或一组回调
private volatile object m_continuationObject;
}
我们看下下面的代码
var antecedentTask = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Antecedent Task Completed");
});
PrintContinuationObjectType(antecedentTask);
antecedentTask.ContinueWith(_ => Console.WriteLine("Continuation Task1 Completed"));
PrintContinuationObjectType(antecedentTask);
antecedentTask.ContinueWith(_ => Console.WriteLine("Continuation Task2 Completed"));
PrintContinuationObjectType(antecedentTask);
Console.ReadLine();
void PrintContinuationObjectType(Task task)
{
var continuationObject = typeof(Task)
.GetField("m_continuationObject",
BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(task);
var type = continuationObject?.GetType().FullName ?? "null";
if (continuationObject is IEnumerable enumerable)
{
type += $", Element type: {enumerable.Cast<object>().First().GetType().FullName}";
}
Console.WriteLine(type);
}
执行结果如下
null System.Threading.Tasks.ContinueWithTaskContinuation System.Collections.Generic.List`1[[System.Object, System.Private.CoreLib, >Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], Element type: >System.Threading.Tasks.ContinueWithTaskContinuation Antecedent Task Completed Continuation Task1 Completed Continuation Task2 Completed
随着回调函数注册数量的增加,m_continuationObject 保存的数据类型也在变化
- 没有注册时:null
- 一个回调时:ContinueWithTaskContinuation 实例
- 超过一个回调时:元素类型是 ContinueWithTaskContinuation 的 List<object>
实际上 m_continuationObject 还有别的类型:
class Task
{
private void RunContinuations(object continuationObject) // separated out of FinishContinuations to enable it to be inlined
{
Debug.Assert(continuationObject != null);
TplEventSource log = TplEventSource.Log;
bool etwIsEnabled = log.IsEnabled();
if (etwIsEnabled)
log.TraceSynchronousWorkBegin(this.Id, CausalitySynchronousWork.CompletionNotification);
bool canInlineContinuations =
(m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) == 0 &&
RuntimeHelpers.TryEnsureSufficientExecutionStack();
switch (continuationObject)
{
// Handle the single IAsyncStateMachineBox case. This could be handled as part of the ITaskCompletionAction
// but we want to ensure that inlining is properly handled in the face of schedulers, so its behavior
// needs to be customized ala raw Actions. This is also the most important case, as it represents the
// most common form of continuation, so we check it first.
case IAsyncStateMachineBox stateMachineBox:
AwaitTaskContinuation.RunOrScheduleAction(stateMachineBox, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single Action case.
case Action action:
AwaitTaskContinuation.RunOrScheduleAction(action, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single TaskContinuation case.
case TaskContinuation tc:
tc.Run(this, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single ITaskCompletionAction case.
case ITaskCompletionAction completionAction:
RunOrQueueCompletionAction(completionAction, canInlineContinuations);
LogFinishCompletionNotification();
return;
}
}
ContinueWithTaskContinuation 的父类 TaskContinuation 是一个抽象类。除了 ContinueWithTaskContinuation,还有别的实现。
internal abstract class TaskContinuation
{
internal abstract void Run(Task completedTask, bool canInlineContinuationTask);
}
ContinueWithTaskContinuation 维护着 Task 执行相关的两个核心对象,一个是 Task 本身,另一是 TaskScheduler。真正执行回调之前,需要先调用 TaskContinuation.Run。
internal sealed class ContinueWithTaskContinuation : TaskContinuation
{
internal Task? m_task;
internal readonly TaskContinuationOptions m_options;
private readonly TaskScheduler m_taskScheduler;
internal ContinueWithTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
{
m_task = task;
m_options = options;
m_taskScheduler = scheduler;
}
internal override void Run(Task completedTask, bool canInlineContinuationTask)
{
// ...
}
}
Task.ContinueWith 回调的生命周期
阶段一 将回调封装进 ContinueWithTaskContinuation
我们向 Task 注册的回调回调最终会以 ContinueWithTaskContinuation 的形式保存在 Task 之中,相关的代码摘录如下。其他 public 的 ContinueWith 可以看做是对这些 private 方法的封装。
class Task
{
private Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler,
CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);
Task continuationTask = new ContinuationTaskFromTask(
this, continuationAction, null,
creationOptions, internalOptions
);
ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
return continuationTask;
}
private Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskScheduler scheduler,
CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);
Task<TResult> continuationTask = new ContinuationResultTaskFromTask<TResult>(
this, continuationFunction, null,
creationOptions, internalOptions
);
ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
return continuationTask;
}
internal void ContinueWithCore(Task continuationTask,
TaskScheduler scheduler,
CancellationToken cancellationToken,
TaskContinuationOptions options)
{
// ...
AddTaskContinuation(continuation);
// ...
}
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
// ...
AddTaskContinuationComplex(tc, addBeforeOthers);
// ...
}
private bool AddTaskContinuationComplex(object tc)
{
List<object?>? list = m_continuationObject as List<object?>;
// ...
list.Add(tc);
// ...
}
}
internal sealed class ContinuationTaskFromTask : Task
{
private Task? m_antecedent;
public ContinuationTaskFromTask(
Task antecedent, Delegate action, object? state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions) :
base(action, state, Task.InternalCurrentIfAttached(creationOptions), default, creationOptions, internalOptions, null)
{
m_antecedent = antecedent;
}
internal override void InnerInvoke()
{
if (m_action is Action<Task> action)
{
action(antecedent);
return;
}
if (m_action is Action<Task, object?> actionWithState)
{
actionWithState(antecedent, m_stateObject);
return;
}
}
}
子流程整理如下:
- 将委托包装到具体的 ContinuationTask 实例里(ContinuationTaskFromTask等 Task 的子类实例),
定义 Task 子类的目的是为了将 AntecedentTask 的引用保存起来,以便在执行 ContinuationTask 将 AntecedentTask 作为委托的参数传入。 - 将 ContinuationTask 包装到 ContinueWithTaskContinuation 实例中
- 将 ContinueWithTaskContinuation 添加到 TaskContinuation 列表里(m_continuationObject)
阶段二 回调的触发
这一部分其实就是上回 Task 可以封装任何类型的别的任务 这一节提到的的流程:
- 调度器在执行完 AntecedentTask 之后,会去调用 AntecedentTask.TrySetResult()
- 在 TrySetResult 方法里,最终会去调用 TaskContinuation.Run()
- ContinueWithTaskContinuation 里会把 ContinuationTask 放入 ContinueWithTaskContinuation 里维护的 TaskScheduler 里调度执行。
回调执行真正的决定者:ContinueWithTaskContinuation
在 ContinueWithTaskContinuation 中维护着待执行的 ContinuationTask 以及决定 ContinuationTask 最终执行方式的 TaskContinuationOptions 和 TaskScheduler。
internal sealed class ContinueWithTaskContinuation : TaskContinuation
{
internal Task? m_task;
internal readonly TaskContinuationOptions m_options;
private readonly TaskScheduler m_taskScheduler;
internal ContinueWithTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
{
m_task = task;
m_options = options;
m_taskScheduler = scheduler;
}
internal override void Run(Task completedTask, bool canInlineContinuationTask)
{
Task? continuationTask = m_task;
m_task = null;
// 检查任务的完成状态,如果不符合 TaskContinuationOptions 的设置,回调就不会被执行
TaskContinuationOptions options = m_options;
bool isRightKind =
completedTask.IsCompletedSuccessfully ?
(options & TaskContinuationOptions.NotOnRanToCompletion) == 0 :
(completedTask.IsCanceled ?
(options & TaskContinuationOptions.NotOnCanceled) == 0 :
(options & TaskContinuationOptions.NotOnFaulted) == 0);
// 任务完成状态符合要求,回调执行。
if (isRightKind)
{
continuationTask.m_taskScheduler = m_taskScheduler;
// 直接执行回调或将其排队等待执行,具体取决于是否需要同步或异步执行。
// 默认执行路径,上层传的是 true。
if (canInlineContinuationTask && // 调用Run方法的内部方法传了允许内联
(options & TaskContinuationOptions.ExecuteSynchronously) != 0) // 注册回调的实际用户设置了同步执行
{
InlineIfPossibleOrElseQueue(continuationTask, needsProtection: true);
}
else
{
try { continuationTask.ScheduleAndStart(needsProtection: true); }
catch (TaskSchedulerException)
{
// 如果 Task 执行失败了,ScheduleAndStart 方法会将 Task 标记为失败,
// 这里是runtime设计的时候保证不会有意外的错误发生,仅做catch,不做处理
}
}
}
else
{
Task.ContingentProperties? cp = continuationTask.m_contingentProperties;
if (cp is null || cp.m_cancellationToken == default)
{
continuationTask.InternalCancelContinueWithInitialState();
}
else
{
continuationTask.InternalCancel();
}
}
}
}
所谓的 Inline 是指在触发回调的线程中直接执行回调。
像 Task.Run 创建的 Task(由 ThreadPoolTaskScheduler 调度,也就是由线程池调度) 的回调如果是 Inline 执行的话,那执行回调的线程和执行传给 Task.Run 的委托的线程,就会是同一个线程池线程。因为线程池在执行完委托之后,就会触发回调执行。
我们注册的 TaskScheduler 可以选择是否只是 Inline。
public abstract class TaskScheduler
{
// 如果不是 Inline 执行,就是走这个方法执行回调
// 如果没有传
protected internal abstract void QueueTask(Task task);
// 如果返回 false,就算参数要求 Inline ,也会走 QueueTask 执行回调
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
// 获取所有调度到该 TaskScheduler 的 Task
protected abstract IEnumerable<Task>? GetScheduledTasks();
}
执行回调的线程
根据上文的吻戏 Task.ContinueWith 的回调最终在哪执行取决于 TaskContinuationOptions 和 TaskScheduler。
下面是几个典型的例子:
- Inline
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}"),
TaskContinuationOptions.ExecuteSynchronously);
Console.ReadKey();
前后线程永远不会发生变化
Task Run, ThreadId: 6 Task OnCompleted, ThreadId: 6
- 调度到 ThreadPool 本地队列
下面的例子里,也就是调度到执行前一个执行前一个委托的线程池线程的本地队列里
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}")); // 默认是 TaskContinuationOptions.None
Console.ReadKey();
有可能前后是一个线程,也有可能不是,可以多执行几次看看。
更多说明请看 ThreadPool 的博客中偷窃机制。
- 调度到 ThreadPool 全局队列
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}"),
TaskContinuationOptions.PreferFairness);
将回调调度到全局队列,等待线程池线程领取并执行。
Task 与 await
与 ContinueWith 相比,await 给我们提供了更加简单的 Task 的使用方式。
Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// 等效于
var result = await Task.Run(() => "Hello");
Console.WriteLine($"{result} World");
Awaiter
我们可以通过 Task.GetAwaiter 从 Task 实例上获取到 Task 对应的 TaskAwaiter 对象。并且可以通过 TaskAwaiter.OnCompleted 方法注册回调,其执行结果与 Task.ContinueWith 一致。
TaskAwaiter awaiter1 = Task.Run(()=> Console.WriteLine("Hello")).GetAwaiter();
awaiter1.OnCompleted(()=> Console.WriteLine("World"));
TaskAwaiter<string> awaiter2 = Task.Run(()=> "Hello").GetAwaiter();
awaiter2.OnCompleted(()=> Console.WriteLine($"{awaiter2.GetResult()} World"));
Console.ReadKey();
Hello World Hello World
注意:直接调用 TaskAwaiter.GetResult 会阻塞调用线程直至 Task 执行完成。
TaskAwaiter 本质上可以理解成在 await 语法糖编译成的代码中,为了解耦 Task 和状态机,而创建的一个隔离层,内部对 Task 进行了包装。
public class Task<TResult>
{
public TaskAwaiter<TResult> GetAwaiter() => new TaskAwaiter<TResult>(this);
internal void SetContinuationForAwait(
Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext)
{
TaskContinuation? tc = null;
if (continueOnCapturedContext)
{
SynchronizationContext? syncCtx = SynchronizationContext.Current;
if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
{
tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, continuationAction, flowExecutionContext);
}
else
{
TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
if (scheduler != null && scheduler != TaskScheduler.Default)
{
tc = new TaskSchedulerAwaitTaskContinuation(scheduler, continuationAction, flowExecutionContext);
}
}
}
if (tc == null && flowExecutionContext)
{
tc = new AwaitTaskContinuation(continuationAction, flowExecutionContext: true);
}
if (tc != null)
{
if (!AddTaskContinuation(tc, addBeforeOthers: false))
tc.Run(this, canInlineContinuationTask: false);
}
else
{
if (!AddTaskContinuation(continuationAction, addBeforeOthers: false))
AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
}
}
}
public readonly struct TaskAwaiter<TResult> :
ICriticalNotifyCompletion,
INotifyCompletion,
ITaskAwaiter
{
private readonly Task<TResult> m_task;
internal TaskAwaiter(Task task)
{
m_task = task;
}
public bool IsCompleted => m_task.IsCompleted;
public void OnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true,
flowExecutionContext: true);
}
public void UnsafeOnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true,
flowExecutionContext: false);
}
[StackTraceHidden]
public TResult GetResult()
{
TaskAwaiter.ValidateEnd((Task)this.m_task);
return this.m_task.ResultOnSuccess;
}
internal static void OnCompletedInternal(
Task task,
Action continuation,
bool continueOnCapturedContext,
bool flowExecutionContext)
{
task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}
}
可以看到 TaskAwaiter.OnCompleted 就是往 Task 注册回调,而 await 关键词的本质就是把 await 后面的代码变成了回调并注册到了 Task 上。
Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// 等效于
var result = await Task.Run(() => "Hello");
Console.WriteLine($"{result} World");
// 等效于
Task.Run(()=> "Hello").GetAwaiter().OnCompleted(()=> Console.WriteLine("World"));
至于 TaskAwaiter.UnsafeOnCompleted 我们稍后解释。
await Anything
C# 编译器并没有限制 await 关键词只能用在 Task 上。例如 Task.Yield()
的返回值 YieldAwaitable,既不是 Task 也不是 Task 的子类。
public readonly struct YieldAwaitable
{
public YieldAwaitable.YieldAwaiter GetAwaiter() => new YieldAwaitable.YieldAwaiter();
public readonly struct YieldAwaiter :
ICriticalNotifyCompletion,
INotifyCompletion
{
public bool IsCompleted => false;
public void OnCompleted(Action continuation) => YieldAwaitable.YieldAwaiter.QueueContinuation(continuation, true);
public void UnsafeOnCompleted(Action continuation) => YieldAwaitable.YieldAwaiter.QueueContinuation(continuation, false);
public void GetResult()
{
}
}
}
Task 和 YieldAwaitable 都提供了一个 GetAwaiter 方法。
返回的 XXXAwaiter 需满足以下两个条件:
- ICriticalNotifyCompletion,INotifyCompletion 这两个接口。而 ICriticalNotifyCompletion 是 INotifyCompletion 的子接口。
public interface INotifyCompletion
{
void OnCompleted(Action continuation);
}
public interface ICriticalNotifyCompletion : INotifyCompletion
{
void UnsafeOnCompleted(Action continuation);
}
- 提供 IsCompleted 属性 和 void GetResult() / TResult GetResult() 方法。GetResult 方法是否有返回值取决于 await XXXAwaitable 是否想提供返回值。
实际上,我们自己想要实现一个 Awaitable 的话,Awaiter 只需要实现 INotifyCompletion 接口或者 ICriticalNotifyCompletion 就可以了。
首先,我们需要准备好一个 Awaitable。
class FooAwaitable<TResult>
{
// 回调,简化起见,未将其包裹到 TaskContinuation 这样的容器里
private Action _continuation;
private TResult _result;
private volatile bool _completed;
public bool IsCompleted => _completed;
// Awaitable 中的关键部分,提供 GetAwaiter 方法
public FooAwaiter<TResult> GetAwaiter() => new FooAwaiter<TResult>(this);
public void Run(Func<TResult> func)
{
new Thread(() =>
{
var result = func();
TrySetResult(result);
})
{
IsBackground = true
}.Start();
}
private bool AddFooContinuation(Action action)
{
if (_completed)
{
return false;
}
_continuation += action;
return true;
}
private void TrySetResult(TResult result)
{
_result = result;
_completed = true;
_continuation?.Invoke();
}
// TODO: 实现一个 FooAwaiter 作为 FooAwaitable 内部类
// public struct FooAwaiter<TResult> : INotifyCompletion Or ICriticalNotifyCompletion
// {
// }
}
实现 INotifyCompletion 接口的 Awaiter 示例
var fooAwaitable = new FooAwaitable<string>();
fooAwaitable.Run(() =>
{
// 可以把Sleep去掉看看
Thread.Sleep(100);
Console.WriteLine("Hello");
return "World";
});
var x = await fooAwaitable;
Console.WriteLine(x);
Console.ReadKey();
class FooAwaitable<TResult>
{
// ...
// 上面所展示的 FooAwaitable 里的代码,此处省略
// ...
// 1. 实现 INotifyCompletion
public struct FooAwaiter<TResult> : INotifyCompletion
{
private readonly FooAwaitable<TResult> _fooAwaitable;
// 2. 实现 IsCompleted 属性
public bool IsCompleted => _fooAwaitable.IsCompleted;
public FooAwaiter(FooAwaitable<TResult> fooAwaitable)
{
_fooAwaitable = fooAwaitable;
}
public void OnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.OnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.OnCompleted: added continuation");
}
else
{
// 试着把上面的 Thread.Sleep(100) 删掉看看,就有可能会执行到这里
// 也就是回调的注册时间有可能晚于任务完成的时间
Console.WriteLine("FooAwaiter.OnCompleted: already completed, invoking continuation");
continuation();
}
}
// 3. 实现 GetResult 方法
public TResult GetResult()
{
Console.WriteLine("FooAwaiter.GetResult");
return _fooAwaitable._result;
}
}
}
执行结果如下:
FooAwaiter.OnCompleted FooAwaiter.OnCompleted: added continuation Hello FooAwaiter.GetResult World
实现 ICriticalNotifyCompletion 接口的 Awaiter 示例
var fooAwaitable = new FooAwaitable<string>();
fooAwaitable.Run(() =>
{
Thread.Sleep(100);
Console.WriteLine("Hello");
return "World";
});
var x = await fooAwaitable;
Console.WriteLine(x);
Console.ReadKey();
class FooAwaitable<TResult>
{
// ...
// 上面所展示的 FooAwaitable 里的代码,此处省略
// ...
// 1 实现 ICriticalNotifyCompletion
public struct FooAwaiter<TResult> : ICriticalNotifyCompletion
{
private readonly FooAwaitable<TResult> _fooAwaitable;
// 2 实现 IsCompleted 属性
public bool IsCompleted => _fooAwaitable.IsCompleted;
public FooAwaiter(FooAwaitable<TResult> fooAwaitable)
{
_fooAwaitable = fooAwaitable;
}
public void OnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.OnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.OnCompleted: added continuation");
}
else
{
Console.WriteLine("FooAwaiter.OnCompleted: already completed, invoking continuation");
continuation();
}
}
public void UnsafeOnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted: added continuation");
}
else
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted: already completed, invoking continuation");
continuation();
}
}
// 3. 实现 GetResult 方法
public TResult GetResult()
{
Console.WriteLine("FooAwaiter.GetResult");
return _fooAwaitable._result;
}
}
}
执行结果如下:
FooAwaiter.UnsafeOnCompleted FooAwaiter.UnsafeOnCompleted: added continuation Hello FooAwaiter.GetResult World
一旦实现了 ICriticalNotifyCompletion(INotifyCompletion 的子接口),注册回调走的是 UnsafeOnCompleted 方法。如果同时实现两个方法,也还是以ICriticalNotifyCompletion 的规则优先。
INotifyCompletion VS ICriticalNotifyCompletion
既然实现 Awaitable 只要实现两个接口之一,那为什么要区分出这两个接口呢。
我们来看看 TaskAwaiter 里的实现是什么样。
public readonly struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
{
private readonly Task<TResult> m_task;
internal TaskAwaiter(Task<TResult> task)
{
m_task = task;
}
// ...
public void OnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: true);
}
public void UnsafeOnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: false);
}
internal static void OnCompletedInternal(
Task task,
Action continuation,
bool continueOnCapturedContext,
bool flowExecutionContext)
{
m_task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}
// ...
}
OnCompleted 和 UnsafeOnCompleted 的唯一区别是在调用 TaskAwaiter.OnCompletedInternal 时,flowExecutionContext 这个参数有所不同。
ExecutionContext 的本质是一个线程私有变量,维护着我们常用 AsyncLocal 的数据,例如 Thread.CurrentThread.CurrentCulture 其实就是一个 AsyncLocal 变量。
runtime 中会在发生线程切换的地方,将 ExecutionContext 从前一个线程拷贝到后一个线程。那么第二个线程里也就可以拿到在第一个线程里设置好的 AsyncLocal 变量。
就算线程没有发生切换,runtime 里有的地方也会通过清空 ExecutionContext 来阻止其往后传播。
更多 ExcutionContext 和 AsyncLocal 的解析,请参考我之前的一篇博客:
//www.cnblogs.com/eventhorizon/p/12240767.html
也就是说 OnCompleted 会保证 ExecutionContext 往后传播。而 UnsafeOnCompleted 则不会。我们来看下面这个示例。
class Program
{
private static readonly AsyncLocal<string> AsyncLocal = new AsyncLocal<string>();
static void Main(string[] args)
{
AsyncLocal.Value = "Hello World";
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine(
$"Task1 Run, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}");
})
.GetAwaiter()
.OnCompleted(() =>
Console.WriteLine(
$"Task1 OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}"));
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine(
$"Task2 Run, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}");
})
.GetAwaiter()
.UnsafeOnCompleted(() =>
Console.WriteLine(
$"Task2 UnsafeOnCompleted, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}"));
Console.ReadKey();
}
}
Task1 Run, ThreadId: 6, AsyncLocal: Hello World
Task2 Run, ThreadId: 7, AsyncLocal: Hello World
Task1 OnCompleted, ThreadId: 6, AsyncLocal: Hello World
Task2 UnsafeOnCompleted, ThreadId: 7, AsyncLocal:
如果使用了 UnsafeOnCompleted 注册回调,也就是 flowExecutionContext: false
,则 ExecutionContext 不会往后继续传播。
同一个 Task 回调执行前后线程没变是因为 TaskSchedulerAwaitTaskContinuation 里优先 Inline 执行回调,暂不展开。
AsyncTaskMethodBuilder 是状态机的一个重要组成部分,负责 状态机与 awaiter 的衔接工作,更详细的功能我们下篇博客再叙述,这边只简单提一下。
AsyncTaskMethodBuilder 主要负责以下功能:
- 将 async 方法内部的返回值封装到 async 方法的最终所返回的 Task 中,并作为这个 Task 的返回值。
- 将 async 方法内部发生的异常 封装到 async 方法的最终所返回的 Task 中。
- 将状态机待执行的动作作为回调 向 awaiter 注册(awaiter 内部再向 Task 注册)。
我们可以给 async 方法内部的状态机自己绑定 AsyncMethodBuilder。在自定义的 AsyncTaskMethodBuilder 里可以决定要不要往后传 ExecutionContext.UnsafeOnCompleted 这个方法的存在意义就是为了在我们不像往后传 ExecutionContext 的时候使用。
async 方法 内的 AsyncMethodBuilder 和 async 方法的返回值有关,AsyncMethodBuilder 绑定在作为返回值的 Awaitable 上,下篇再讲。
就目前 .NET 6 的代码来说, async Task FooAsync(){}
这样的以 Task 作为返回值的 async 方法中的状态机来说,Task 方法所绑定的 AsyncMethodBuilder 内并没有调用 TaskAwaiter.UnsafeOnCompleted 方法,而是通过其他方式注册的回调,大致的流程和使用 TaskAwaiter.UnsafeOnCompleted 进行注册时类似的。
如果像上文那样自己实现 Awaitable,会调用 TaskAwaiter.OnCompleted 或者 TaskAwaiter.OnCompleted 方法。这个和 AsyncMethodBuilder 内部的实现有关。(手动狗头,设计的太复杂了)
有限元状态机
下面是摘自百度百科的关于状态机的说明:
状态机可归纳为4个要素,即现态、条件、动作、次态。这样的归纳,主要是出于对状态机的内在因果关系的考虑。“现态”和“条件”是因,“动作”和“次态”是果。详解如下:
- 现态:是指当前所处的状态。
- 条件:又称为“事件”,当一个条件被满足,将会触发一个动作,或者执行一次状态的迁移。
- 动作:条件满足后执行的动作。动作执行完毕后,可以迁移到新的状态,也可以仍旧保持原状态。动作不是必需的,当条件满足后,也可以不执行任何动作,直接迁移到新状态。
- 次态:条件满足后要迁往的新状态。“次态”是相对于“现态”而言的,“次态”一旦被激活,就转变成新的“现态”了。
而有限元状态机的有限是指状态的有限。
观察下面这么一个常见的 await 使用场景,可以将 FooAsync 方法内部的逻辑分为三种状态(即 三个阶段):
- 初始化状态
- 等待 BarAsync 执行完成的状态
- 执行结束状态
class Program
{
static async Task Main(string[] args)
{
var a = 1;
Console.WriteLine(await FooAsync(a));
}
static async Task<int> FooAsync(int a)
{
int b = 2;
int c = await BarAsync();
return a + b + c;
}
static async Task<int> BarAsync()
{
await Task.Delay(100);
return 3;
}
}
由 FooAsync 编译成的 IL 代码经整理后的等效 C# 代码如下:
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var a = 1;
Console.WriteLine(await FooAsync(a));
}
static Task<int> FooAsync(int a)
{
var stateMachine = new FooStateMachine
{
_asyncTaskMethodBuilder = AsyncTaskMethodBuilder<int>.Create(),
_state = -1, // 初始化状态
_a = a // 将实参拷贝到状态机字段
};
// 开始执行状态机
stateMachine._asyncTaskMethodBuilder.Start(ref stateMachine);
return stateMachine._asyncTaskMethodBuilder.Task;
}
static async Task<int> BarAsync()
{
await Task.Delay(100);
return 3;
}
public class FooStateMachine : IAsyncStateMachine
{
// 方法的参数和局部变量被编译会字段
public int _a;
public AsyncTaskMethodBuilder<int> _asyncTaskMethodBuilder;
private int _b;
private int _c;
// -1: 初始化状态
// 0: 等到 Task 执行完成
// -2: 状态机执行完成
public int _state;
private TaskAwaiter<int> _taskAwaiter;
public void MoveNext()
{
var result = 0;
TaskAwaiter<int> taskAwaiter;
try
{
// 状态不是0,代表 Task 未完成
if (_state != 0)
{
// 初始化局部变量
_b = 2;
taskAwaiter = Program.BarAsync().GetAwaiter();
if (!taskAwaiter.IsCompleted)
{
// state: -1 => 0,异步等待 Task 完成
_state = 0;
_taskAwaiter = taskAwaiter;
var stateMachine = this;
// 内部会调用 将 stateMachine.MoveNext 注册为 Task 的回调
_asyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref taskAwaiter, ref stateMachine);
return;
}
}
else
{
taskAwaiter = _taskAwaiter;
// TaskAwaiter 是个结构体,这边相当于是个清空 _taskAwaiter 字段的操作
_taskAwaiter = new TaskAwaiter<int>();
// state: 0 => -1,状态机恢复到初始化状态
_state = -1;
}
_c = taskAwaiter.GetResult();
result = _a + _b + _c;
}
catch (Exception e)
{
// state: any => -2,状态机执行完成
_state = -2;
_asyncTaskMethodBuilder.SetException(e);
return;
}
// state: -1 => -2,状态机执行完成
_state = -2;
// 将 result 设置为 FooAsync 方法的返回值
_asyncTaskMethodBuilder.SetResult(result);
}
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
}
编译器在 Program 中创建了一个内部类,也就是 FooStateMachine 这个状态机,而 FooAsync
方法则变成了对这个状态机的使用。
AsyncTaskMethodBuilder 的作用解释放到下一篇文章再解释,这边简单理解成 AsyncTaskMethodBuilder.SetResult 就是 FooAsync
return 返回值,AsyncTaskMethodBuilder.SetException 就是 FooAsync
内部往外扔异常。
完整的流程如下图所示:
一个方法中就算有个 await,这个方法也只会有一个对应的状态机。就.NET 6 SDK 的编译结果来看,state 会出现 -1 => 0(等待第一个Task异步执行完成) => -1 => 0(等待第二个Task异步执行完成)这样的流程。
AsyncStateMachineBox
前文讲过 awaiter 往 Task 注册回调的逻辑里,可能不会直接传递 ExcutionContext。
而这个 AsyncStateMachineBox 是对 AsyncStateMachine 和 ExcutionContext 的包装,这边通过这样的方式往后传递 ExcutionContext。
await Task 的回调在哪执行
回忆一下上文 Task.ContinueWith 讲回调最终封装到了 ContinueWithTaskContinuation。
返回值是 Task 的情况下状态机所绑定的 AsyncTaskMethodBuilder 的所会调用 Task.UnSafeSetContinuationForAwait 实例方法。里面会根据不同的条件创建不同的 TaskContinuation。
UnSafeSetContinuationForAwait 中的逻辑和后续回调执行流程大致如下:
同步上下文(SynchronizationContext)导致的死锁问题与 Task.ConfigureAwait(continueOnCapturedContext:false)
如果存在 SynchronizationContext,回调会优先在 SynchronizationContext 上执行。而 SynchronizationContext 也是一种任务调度器,其存在时间应该是早于 Task 的。
在 .NET Framework 时代的 WPF、Windows Form、Asp.NET Web Form 这些框架里,都有 SynchronizationContext 的存在。
下面是一个 SynchronizationContext 的实现示例:
class SingleThreadedSynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<(SendOrPostCallback Callback, object State)> _queue = new BlockingCollection<(SendOrPostCallback Callback, object State)>();
public override void Send(SendOrPostCallback d, object state) // Sync operations
{
throw new NotSupportedException($"{nameof(SingleThreadedSynchronizationContext)} does not support synchronous operations.");
}
public override void Post(SendOrPostCallback d, object? state) // Async operations
{
_queue.Add((d, state));
}
public static void Run(Action action)
{
var previous = Current;
var context = new SingleThreadedSynchronizationContext();
SetSynchronizationContext(context);
try
{
Console.WriteLine("Executing first action, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
action();
while (context._queue.TryTake(out var item))
{
Console.WriteLine("Executing callback, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
item.Callback(item.State);
}
}
finally
{
context._queue.CompleteAdding();
SetSynchronizationContext(previous);
}
}
}
WPF 这些框架里,UI 只允许 UI 线程去更新。这些 SynchronizationContext 有个特点,就是一次只允许一个任务执行。
class Program
{
private static void Main(string[] args)
{
new Thread(() =>
{
Console.WriteLine("Thread started, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
SingleThreadedSynchronizationContext.Run(Test);
})
{
IsBackground = true
}.Start();
Console.ReadKey();
}
private static void Test()
{
Console.WriteLine("Test: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"Test.SynchronizationContext1: {SynchronizationContext.Current}");
// 时间点一:这里把唯一的执行线程给阻塞住了,会导致死锁
DoSthAsync().GetAwaiter().GetResult();
Console.WriteLine($"Test.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("Test: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
private static async Task DoSthAsync()
{
Console.WriteLine("DoSthAsync: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"DoSthAsync.SynchronizationContext1: {SynchronizationContext.Current}");
// await 后面的代码作为 Task.Delay 的回调,
// 等待 Task.Delay 结束后会由 MaxConcurrencySynchronizationContext 进行调度执行
await Task.Delay(100);
// 时间点二:MaxConcurrencySynchronizationContext 唯一的线程已经被阻塞住了,死锁开始
Console.WriteLine($"DoSthAsync.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("DoSthAsync: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
}
执行结果如下:
Thread started, CurrentThreadId: 10 Executing first action, CurrentThreadId: 10 Test: START, CurrentThreadId: 10 Test.SynchronizationContext1: SingleThreadedSynchronizationContext DoSthAsync: START, CurrentThreadId: 10 DoSthAsync.SynchronizationContext1: SingleThreadedSynchronizationContext
await Task.Delay(100)
的回调将无法被执行。
那么如何在这些 UI 框架里避免死锁呢?我们只需要将 await Task.Delay(100)
改为 await Task.Delay(100).ConfigureAwait(continueOnCapturedContext:false)
class Program
{
private static void Main(string[] args)
{
new Thread(() =>
{
Console.WriteLine("Thread started, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
SingleThreadedSynchronizationContext.Run(Test);
})
{
IsBackground = true
}.Start();
Console.ReadKey();
}
private static void Test()
{
Console.WriteLine("Test: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"Test.SynchronizationContext1: {SynchronizationContext.Current}");
// 时间点一:这里把唯一的执行线程给阻塞住了,但不会导致死锁
DoSthAsync().GetAwaiter().GetResult();
Console.WriteLine($"Test.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("Test: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
private static async Task DoSthAsync()
{
Console.WriteLine("DoSthAsync: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"DoSthAsync.SynchronizationContext1: {SynchronizationContext.Current}");
// await 后面的代码作为 Task.Delay 的回调,
// 等待 Task.Delay 结束后会由 线程池 进行调度执行
await Task.Delay(100).ConfigureAwait(false);
// 时间点二:线程池执行回调,这边已经不存在 SynchronizationContext 了
Console.WriteLine($"DoSthAsync.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("DoSthAsync: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
}
执行修改后的代码:
Test: START, CurrentThreadId: 10 Test.SynchronizationContext1: SingleThreadedSynchronizationContext DoSthAsync: START, CurrentThreadId: 10 DoSthAsync.SynchronizationContext1: SingleThreadedSynchronizationContext DoSthAsync.SynchronizationContext2: DoSthAsync: END, CurrentThreadId: 6 Test.SynchronizationContext2: SingleThreadedSynchronizationContext Test: END, CurrentThreadId: 10
ConfigureAwait 方法返回了一个 ConfiguredTaskAwaitable 对象,对原有的 Task 进行了包装,后续创建 TaskContinuation 的流程里会走 continueOnCapturedContext: false 的分支。
class Task
{
public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext)
{
return new ConfiguredTaskAwaitable(this, continueOnCapturedContext);
}
}
为什么没有同步上下文也会死锁
我们的 Web Api 项目中,默认是不存在 SynchronizationContext 的。那为什么有的同学还会遇到死锁问题呢,而且主要是高并发的情况下,本地可能没办法复现。
这个和 ThreadPool 中的 Starvation Avoidance 机制有关。
DoSthAsync().GetAwaiter().GetResult()
会阻塞线程池线。.NET 6之前极端情况导致线程池无可用线程,导致所谓的“死锁”。
总结
- TaskContinuation:维护回调和调度回调。
- Awaiter:对 Awaitable 进行封装,负责与状态机进行交互。
- 状态机:由编译器生成,每个 async 方法 有且仅有一个,await 后面的代码会被编译到 状态机 的 MoveNext 方法中,注册为 Task 的回调。
- AsyncMethodBuilder:状态机的重要组成部分,async 方法内外沟通的桥梁,和 async 方法的返回值类型绑定。
- 无论何时,都谨慎使用
DoSthAsync().GetAwaiter().GetResult()
这样的代码。
参考资料
//devblogs.microsoft.com/pfxteam/whats-new-for-parallelism-in-net-4-5-beta/
//devblogs.microsoft.com/dotnet/configureawait-faq/