面试八股文:你写过自定义任务调度器吗?

最近入职了新公司,尝试阅读祖传代码,记录并更新最近的编程认知。

思绪由Q1引发,后续Q2、Q3基于Q1的发散探究

Q1. Task.Run、Task.Factory.StartNew 的区别?

我们常使用Task.RunTask.Factory.StartNew创建并启动任务,但是他们的区别在哪里?在哪种场景下使用前后者?

官方推荐使用Task.Run方法启动基于计算的任务,
当需要对长时间运行、基于计算的任务做精细化控制时使用Task.Factory.StartNew。

Task.Factory提供了自定义选项、自定义调度器的能力,这也说明Task.Run是Task.Factory.StartNew的一个特例,Task.Run 只是提供了一个无参、默认的任务创建和调度方式。

当你在Task.Run传递委托

Task.Run(someAction);

实际上等价于

Task.Factory.StartNew(someAction, 
    CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

一个长时间运行的任务,如果使用Task.Run特定会使用线程池线程,可能构成滥用线程池线程,这个时候最好在独立线程中执行任务。

Q2. 既然说到Task.Run使用线程池线程,线程池线程有哪些特征? 为什么有自定义调度器一说?

github: TaskScheduler 251行显示TaskSchedule.Dafult确实是线程池任务调度器。

线程池线程的特征:
① 池中线程都是后台线程
② 线程可重用,一旦线程池中的线程完成任务,将返回到等待线程队列中, 避免了创建线程的开销
③ 池中预热了工作者线程、IO线程,

  • 线程池最大线程数:线程池线程都忙碌,后续任务将排队等待空闲线程;
  • 最小值:线程池根据需要提供 工作线程/IO完成线程, 直到达到某最小值; 达到某最小值,线程池可以创建或者等待。

我启动一个脚手架项目: 默认最大工作者线程32767,最大IO线程1000 ; 默认最小工作线程数、最小IO线程数均为8个

github: ThreadPoolTaskScheduler 显示线程池任务调度器是这样调度任务的:

/// <summary>
/// Schedules a task to the ThreadPool.
/// </summary>
/// <param name="task">The task to schedule.</param>
protected internal override void QueueTask(Task task)
{
     TaskCreationOptions options = task.Options;
     if ((options & TaskCreationOptions.LongRunning) != 0)
     {
          // Run LongRunning tasks on their own dedicated thread.
          Thread thread = new Thread(s_longRunningThreadWork);
          thread.IsBackground = true; // Keep this thread from blocking process shutdown
          thread.Start(task);
    }
    else
    {
         // Normal handling for non-LongRunning tasks.
        bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0);
        ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal);
    }
}

注意8-14行:若上层使用者将LongRunning任务应用到默认的任务调度器(也即线程池任务调度器),线程池任务调度器会有一个兜底方案,会将任务放在独立线程上执行。

何时不使用线程池线程

有几种应用场景,其中适合创建并管理自己的线程,而非使用线程池线程:

  • 需要一个前台线程。
  • 需要具有特定优先级的线程。
  • 拥有会导致线程长时间阻塞的任务。 线程池具有最大线程数,因此大量被阻塞的线程池线程可能会阻止任务启动。
  • 需将线程放入单线程单元。 所有 ThreadPool 线程均位于多线程单元中。
  • 需具有与线程关联的稳定标识,或需将一个线程专用于一项任务。

Q3. 既然要自定义调度器,那我们就来自定义一下?

实现TaskScheduler 抽象类,其中的抓手是“调度”,也就是 QueueTask 方法,之后你自由定义数据结构, 从数据结构中调度出线程来执行任务。

public sealed class CustomTaskScheduler : TaskScheduler, IDisposable
    {
        private BlockingCollection<Task> tasksCollection = new BlockingCollection<Task>();
        private readonly Thread mainThread = null;
        public CustomTaskScheduler()
        {
            mainThread = new Thread(new ThreadStart(Execute));
            if (!mainThread.IsAlive)
            {
                mainThread.Start();
            }
        }
        private void Execute()
        {
            foreach (var task in tasksCollection.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        }
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return tasksCollection.ToArray();
        }
        protected override void QueueTask(Task task)
        {
            if (task != null)
                tasksCollection.Add(task);           
        }
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        private void Dispose(bool disposing)
        {
            if (!disposing) return;
            tasksCollection.CompleteAdding();
            tasksCollection.Dispose();
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    }

应用我们的自定义任务调度器:

CustomTaskScheduler taskScheduler = new CustomTaskScheduler();
Task.Factory.StartNew(() => SomeMethod(), CancellationToken.None, TaskCreationOptions.None, taskScheduler);