Linux中斷管理 (3)workqueue工作隊列【轉】

  • 2019 年 10 月 10 日
  • 筆記

轉自:https://www.cnblogs.com/arnoldlu/p/8659988.html

目錄:

Linux中斷管理

Linux中斷管理 (1)Linux中斷管理機制

Linux中斷管理 (2)軟中斷和tasklet

Linux中斷管理 (3)workqueue工作隊列

關鍵詞:

工作隊列的原理是把work(需要推遲執行的函數)交由一個內核執行緒來執行,它總是在進程上下文中執行。

工作隊列的優點是利用進程上下文來執行中斷下半部操作,因此工作隊列允許重新調度和睡眠,是非同步執行的進程上下文,它還能解決軟中斷和tasklet執行時間過長導致系統實時性下降等問題。

當驅動程式或者內核子系統在進程上下文中有非同步執行的工作任務時,可以使用work item來描述工作任務,包括該工作任務的執行回調函數,把work item添加到一個隊列中,然後一個內核執行緒回去執行這個工作任務的回調函數。

這裡work item被稱為工作,隊列被稱為workqueue,即工作隊列,內核執行緒被稱為worker。

CMWQ(Concurrency Managed Workqueues)

執行work item任務的執行緒被稱為worker或者工作執行緒。工作執行緒會串列化地執行掛入到隊列中所有的work item。如果隊列中沒有work,那麼該工作執行緒就會變成idle態。

為了管理眾多工作執行緒,CMWQ提出了工作執行緒池(worker-pool)概念,worker-pool有兩種:

一是bound型,可以理解為Per-CPU類型,每個CPU都有worker-pool;

另一種是unbound型,即不和具體CPU綁定。

這兩種worker-pool都會定義兩個執行緒池,一個給普通優先順序的work使用,另一個給高優先順序的work使用。

1. 初始化工作隊列

1.1 工作、工作隊列、工作執行緒池、工作執行緒數據結構

workqueue機制最小的調度單元是work_struct,即工作任務。

struct work_struct {      atomic_long_t data;---------------低比特位部分是work的標誌位,剩餘比特位通常用於存放上一次運行的worker_pool ID或pool_workqueue的指針。存放的內容有WORK_STRUCT_PWQ標誌位來決定      struct list_head entry;-----------用於把work掛到其他隊列上。      work_func_t func;-----------------工作任務的處理函數  #ifdef CONFIG_LOCKDEP      struct lockdep_map lockdep_map;  #endif  }

工作隊列由struct workqueue_struct數據結構描述:

struct workqueue_struct {      struct list_head    pwqs;        /* WR: all pwqs of this wq */--------------------該workqueue所在的所有pool_workqueue鏈表      struct list_head    list;        /* PL: list of all workqueues */-----------------系統所有workqueue_struct的全局鏈表        struct mutex        mutex;        /* protects this wq */      int            work_color;    /* WQ: current work color */      int            flush_color;    /* WQ: current flush color */      atomic_t        nr_pwqs_to_flush; /* flush in progress */      struct wq_flusher    *first_flusher;    /* WQ: first flusher */      struct list_head    flusher_queue;    /* WQ: flush waiters */      struct list_head    flusher_overflow; /* WQ: flush overflow list */        struct list_head    maydays;    /* MD: pwqs requesting rescue */-------------------所有rescue狀態下的pool_workqueue數據結構鏈表      struct worker        *rescuer;    /* I: rescue worker */---------------------------rescue內核執行緒,記憶體緊張時創建新的工作執行緒可能會失敗,如果創建workqueue是設置了WQ_MEM_RECLAIM,那麼rescuer執行緒會接管這種情況。        int            nr_drainers;    /* WQ: drain in progress */      int            saved_max_active; /* WQ: saved pwq max_active */        struct workqueue_attrs    *unbound_attrs;    /* WQ: only for unbound wqs */---------UNBOUND類型屬性      struct pool_workqueue    *dfl_pwq;    /* WQ: only for unbound wqs */----------------unbound類型的pool_workqueue    #ifdef CONFIG_SYSFS      struct wq_device    *wq_dev;    /* I: for sysfs interface */  #endif  #ifdef CONFIG_LOCKDEP      struct lockdep_map    lockdep_map;  #endif      char            name[WQ_NAME_LEN]; /* I: workqueue name */--------------------------該workqueue的名字        /* hot fields used during command issue, aligned to cacheline */      unsigned int        flags ____cacheline_aligned; /* WQ: WQ_* flags */---------------經常被不同CUP訪問,因此要和cache line對齊。      struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */---------------------指向per-cpu類型的pool_workqueue      struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */  }

運行work_struct的內核執行緒被稱為worker,即工作執行緒。

/*   * The poor guys doing the actual heavy lifting.  All on-duty workers are   * either serving the manager role, on idle list or on busy hash.  For   * details on the locking annotation (L, I, X...), refer to workqueue.c.   *   * Only to be used in workqueue and async.   */  struct worker {      /* on idle list while idle, on busy hash table while busy */      union {          struct list_head    entry;    /* L: while idle */          struct hlist_node    hentry;    /* L: while busy */      };        struct work_struct    *current_work;    /* L: work being processed */----當前正在處理的work      work_func_t        current_func;    /* L: current_work's fn */-----------當前正在執行的work回調函數      struct pool_workqueue    *current_pwq; /* L: current_work's pwq */-------當前work所屬的pool_workqueue      bool            desc_valid;    /* ->desc is valid */      struct list_head    scheduled;    /* L: scheduled works */---------------所有被調度並正準備執行的work_struct都掛入該鏈表中        /* 64 bytes boundary on 64bit, 32 on 32bit */        struct task_struct    *task;        /* I: worker task */-----------------該工作執行緒的task_struct數據結構      struct worker_pool    *pool;        /* I: the associated pool */---------該工作執行緒所屬的worker_pool                          /* L: for rescuers */      struct list_head    node;        /* A: anchored at pool->workers */------可以把該worker掛入到worker_pool->workers鏈表中                          /* A: runs through worker->node */        unsigned long        last_active;    /* L: last active timestamp */      unsigned int        flags;        /* X: flags */      int            id;        /* I: worker id */        /*       * Opaque string set with work_set_desc().  Printed out with task       * dump for debugging - WARN, BUG, panic or sysrq.       */      char            desc[WORKER_DESC_LEN];        /* used only by rescuers to point to the target workqueue */      struct workqueue_struct    *rescue_wq;    /* I: the workqueue to rescue */  }

CMWQ提出了工作執行緒池的概念,struct worker_pool數據結構用於描述工作執行緒池。

worker_pool是per-cpu變數,每個CPU都有worker_pool,而且有兩個worker_pool。

一個用於普通優先順序工作執行緒,另一個用於高優先順序工作執行緒。

struct worker_pool {      spinlock_t        lock;        /* the pool lock */-----------------------用於保護worker_pool的自旋鎖      int            cpu;        /* I: the associated cpu */-------------------對於unbound類型為-1;對於bound類型workqueue表示綁定的CPU ID。      int            node;        /* I: the associated node ID */      int            id;        /* I: pool ID */-------------------------------該worker_pool的ID號      unsigned int        flags;        /* X: flags */        struct list_head    worklist;    /* L: list of pending works */----------掛入pending狀態的work_struct      int            nr_workers;    /* L: total number of workers */-----------工作執行緒的數量        /* nr_idle includes the ones off idle_list for rebinding */      int            nr_idle;    /* L: currently idle ones */------------------處於idle狀態的工作執行緒的數量        struct list_head    idle_list;    /* X: list of idle workers */----------處於idle狀態的工作執行緒鏈表      struct timer_list    idle_timer;    /* L: worker idle timeout */      struct timer_list    mayday_timer;    /* L: SOS timer for workers */        /* a workers is either on busy_hash or idle_list, or the manager */      DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);                          /* L: hash of busy workers */        /* see manage_workers() for details on the two manager mutexes */      struct mutex        manager_arb;    /* manager arbitration */      struct mutex        attach_mutex;    /* attach/detach exclusion */      struct list_head    workers;    /* A: attached workers */---------------該worker_pool管理的工作執行緒鏈表      struct completion    *detach_completion; /* all workers detached */        struct ida        worker_ida;    /* worker IDs for task name */        struct workqueue_attrs    *attrs;        /* I: worker attributes */-----工作執行緒屬性      struct hlist_node    hash_node;    /* PL: unbound_pool_hash node */      int            refcnt;        /* PL: refcnt for unbound pools */        /*       * The current concurrency level.  As it's likely to be accessed       * from other CPUs during try_to_wake_up(), put it in a separate       * cacheline.       */      atomic_t        nr_running ____cacheline_aligned_in_smp;----------------用於管理worker的創建和銷毀的統計計數,表示運行中的worker數量。該變數可能被多CPU同時訪問,因此獨佔一個快取行,避免多核讀寫造成「顛簸」現象。        /*       * Destruction of pool is sched-RCU protected to allow dereferences       * from get_work_pool().       */      struct rcu_head        rcu;---------------------------------------------RCU鎖  }

struct pool_workqueue用於鏈接workqueue和worker_pool。

struct pool_workqueue {      struct worker_pool    *pool;        /* I: the associated pool */-----------指向worker_pool結構      struct workqueue_struct *wq;        /* I: the owning workqueue */----------指向workqueue_struct結構      int            work_color;    /* L: current color */      int            flush_color;    /* L: flushing color */      int            refcnt;        /* L: reference count */      int            nr_in_flight[WORK_NR_COLORS];                          /* L: nr of in_flight works */      int            nr_active;    /* L: nr of active works */------------------活躍的work_strcut數量      int            max_active;    /* L: max active works */-------------------最大活躍work_struct數量      struct list_head    delayed_works;    /* L: delayed works */--------------延遲執行work_struct鏈表      struct list_head    pwqs_node;    /* WR: node on wq->pwqs */      struct list_head    mayday_node;    /* MD: node on wq->maydays */        /*       * Release of unbound pwq is punted to system_wq.  See put_pwq()       * and pwq_unbound_release_workfn() for details.  pool_workqueue       * itself is also sched-RCU protected so that the first pwq can be       * determined without grabbing wq->mutex.       */      struct work_struct    unbound_release_work;      struct rcu_head        rcu;------------------------------------------------RCU鎖  }

上面幾個數據結構的關係圖?

1.2 初始化工作隊列

首先看一下對創建工作隊列有重要影響的flags。

/*   * Workqueue flags and constants.  For details, please refer to   * Documentation/workqueue.txt.   */  enum {      WQ_UNBOUND        = 1 << 1, /* not bound to any cpu */-----------------綁定到某一個CPU執行      WQ_FREEZABLE        = 1 << 2, /* freeze during suspend */--------------在suspend進行進程凍結的時候,需要讓工作執行緒完成當前所有的work才完成進程凍結,並且這個過程不會再新開始一個work的執行,知道進程被解凍。      WQ_MEM_RECLAIM        = 1 << 3, /* may be used for memory reclaim */---在記憶體緊張導致創建新進程失敗,系統通過rescuer內核執行緒去接管這種情況。      WQ_HIGHPRI        = 1 << 4, /* high priority */------------------------屬於高於高優先順序的worker_pool      WQ_CPU_INTENSIVE    = 1 << 5, /* cpu intensive workqueue */------------屬於特別消耗CPU資源的一類work,這個work執行會得到調度器的監管,排在這類work後的non-CPU-intensive類型work可能會推遲執行      WQ_SYSFS        = 1 << 6, /* visible in sysfs, see wq_sysfs_register() */        WQ_POWER_EFFICIENT    = 1 << 7,-----------------根據wq_power_efficient來決定此類型的工作隊列是bound還是unbound類型,bound型可能導致處於idle的CPU被喚醒,而unbound型則不會必然喚醒idle的CPU。        __WQ_DRAINING        = 1 << 16, /* internal: workqueue is draining */      __WQ_ORDERED        = 1 << 17, /* internal: workqueue is ordered */----表示同一時間只能執行一個work_item。      __WQ_ORDERED_EXPLICIT    = 1 << 19, /* internal: alloc_ordered_workqueue() */        WQ_MAX_ACTIVE        = 512,      /* I like 512, better ideas? */      WQ_MAX_UNBOUND_PER_CPU    = 4,      /* 4 * #cpus for unbound wq */      WQ_DFL_ACTIVE        = WQ_MAX_ACTIVE / 2,  };

內核啟動的時候,調用init_workqueues()創建工作執行緒,同時創建了一些常用的工作隊列。

init_workqueues()由early_initcall(init_workqueues)在early階段調用。

1.2.1 誰?都創建了哪些工作執行緒?

對於4核SMP系統來說,必然創建的工作執行緒有:每個CPU的kworker/x:0、kworker/x:0H、以及unbound類型的kworker/u8:0。

init_workqueues()創建CPU0以及unbound工作執行緒

kworker/0:0和kworker/0:0H以及kworker/u8:0都是由init_workqueues創建的,調用軌跡如下。

kworker/0:0、kworker/0:0H:kernel_init()->kernel_init_freeable()->do_one_initcall()->init_workqueues()->create_worker()

kworker/u8:0:kernel_init()->kernel_init_freeable()->do_one_inicall->init_workqueues()->__alloc_workqueue_key()->apply_workqueue_attrs()->alloc_unbound_pwq()->create_worker()

對於unbound工作執行緒的創建是因為init_workqueues()中創建了一系列的workqueue,調用alloc_workqueue()->__allow_workqueue_key()->alloc_and_link_pwqs()->apply_workqueue_attrs()->alloc_unbound_pwq()導致的。

這裡的init_workqueues()為什麼不將CPU1~3的工作執行緒一起創建了?

雖然此處init_workqueues()是在do_one_initcall中執行,但是此處的do_one_initcall較特殊。

static noinline void __init kernel_init_freeable(void)  {  ...      smp_prepare_cpus(setup_max_cpus);        do_pre_smp_initcalls();-------------------------------------此處調用的initcall是在__initcall_start~__initcall0_start之間的函數,也即early_initcall()。所以init_workqueues()在smp_init之前被調用。      lockup_detector_init();        smp_init();      sched_init_smp();-------------------------------------------將剩餘CPU1~3進行up操作。        do_basic_setup();-------------------------------------------執行__initcall_0start之後的initcall函數  ...  }

在初始化pool的時候,是按照possible的CPU來進行初始化的。而在創建工作執行緒的時候是按照online的CPU來創建的。

在init_workqueues()的時刻,CPU1~3還沒有online。所以會先創建kworker/0:0、kworker/0:0H、kworker/u8:0三個工作執行緒。

unbound工作執行緒的pool->id為8也就不難理解了,因為前面4和分配個0~7。

workqueue_cpu_up_callback()創建了其他CPU工作執行緒

kernel_init()->kernel_init_freeable()->smp_init()->cpu_up()->_cpu_up()->__raw_notifier_call_chain()->workqueue_cpu_up_callback()->create_worker()

在init_workqueues()開頭就註冊了CPU_PRI_WORKQUEUE_UP處理函數,所以在smp_init()->cpu_up()將CPU啟動之後就會為每個CPU創建兩個工作執行緒

1.2.2 init_workqueues()初始化worker_pool、worker、workqueue

static int __init init_workqueues(void)  {      int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };---------------這裡HIGHPRI_NICE_LEVEL為-20,對應的prio為100,是普通進程裡面的最高優先順序。      int i, cpu;        WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));        pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);        cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);--------------跟隨CPU_UP/CPU_DOWN動態創建工作執行緒的介面。      hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);        wq_numa_init();        /* initialize CPU pools */      for_each_possible_cpu(cpu) {------------------------------------------------遍歷每個possible狀態的CPU          struct worker_pool *pool;            i = 0;          for_each_cpu_worker_pool(pool, cpu) {-----------------------------------每個CPU兩個worker_poo,分別對應per-cpu變數cpu_worker_pool[0]和cpu_worker_pool[1]              BUG_ON(init_worker_pool(pool));-------------------------------------初始化worker_pool              pool->cpu = cpu;              cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));              pool->attrs->nice = std_nice[i++];----------------------------------設置nice值              pool->node = cpu_to_node(cpu);                /* alloc pool ID */              mutex_lock(&wq_pool_mutex);              BUG_ON(worker_pool_assign_id(pool));              mutex_unlock(&wq_pool_mutex);          }      }        /* create the initial worker */      for_each_online_cpu(cpu) {--------------------------------------------------遍歷所有online狀態CPU,對於SMP多核CPU,支隊boot cpu創建了工作執行緒。其他CPU工作執行緒稍後再cpu_up中創建。          struct worker_pool *pool;            for_each_cpu_worker_pool(pool, cpu) {-----------------------------------使用create_worker對每個worker_pool創建兩個內核執行緒對應cpu_worker_pool[0]和cpu_worker_pool[1]              pool->flags &= ~POOL_DISASSOCIATED;              BUG_ON(!create_worker(pool));          }      }        /* create default unbound and ordered wq attrs */      for (i = 0; i < NR_STD_WORKER_POOLS; i++) {          struct workqueue_attrs *attrs;            BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));          attrs->nice = std_nice[i];          unbound_std_wq_attrs[i] = attrs;---------------------------------------設置Unbound類型workqueue的屬性            /*           * An ordered wq should have only one pwq as ordering is           * guaranteed by max_active which is enforced by pwqs.           * Turn off NUMA so that dfl_pwq is used for all nodes.           */          BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));          attrs->nice = std_nice[i];          attrs->no_numa = true;          ordered_wq_attrs[i] = attrs;-------------------------------------------設置ordered類型workqueue的屬性,ordered類型workqueue同一時刻只能有一個work item在運行。      }        system_wq = alloc_workqueue("events", 0, 0);-------------------------------普通優先順序bound類型工作隊列system_wq      system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);------高優先順序bound類型工作隊列system_highpri_wq      system_long_wq = alloc_workqueue("events_long", 0, 0);---------------------      system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,----------普通優先順序unbound類型工作隊列system_unbound_wq                          WQ_UNBOUND_MAX_ACTIVE);      system_freezable_wq = alloc_workqueue("events_freezable",------------------freezable類型工作隊列system_freezable_wq                            WQ_FREEZABLE, 0);      system_power_efficient_wq = alloc_workqueue("events_power_efficient",------省電類型的工作隊列system_power_efficient_wq                            WQ_POWER_EFFICIENT, 0);      system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",                            WQ_FREEZABLE | WQ_POWER_EFFICIENT,-------------------freezable並且省電類型的工作隊列system_freezable_power_efficient_wq                            0);      BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||             !system_unbound_wq || !system_freezable_wq ||             !system_power_efficient_wq ||             !system_freezable_power_efficient_wq);      return 0;  }
static int workqueue_cpu_up_callback(struct notifier_block *nfb,                             unsigned long action,                             void *hcpu)  {      int cpu = (unsigned long)hcpu;      struct worker_pool *pool;      struct workqueue_struct *wq;      int pi;        switch (action & ~CPU_TASKS_FROZEN) {      case CPU_UP_PREPARE:          for_each_cpu_worker_pool(pool, cpu) {              if (pool->nr_workers)                  continue;              if (!create_worker(pool))                  return NOTIFY_BAD;          }          break;        case CPU_DOWN_FAILED:      case CPU_ONLINE:          mutex_lock(&wq_pool_mutex);            for_each_pool(pool, pi) {              mutex_lock(&pool->attach_mutex);                if (pool->cpu == cpu)                  rebind_workers(pool);              else if (pool->cpu < 0)                  restore_unbound_workers_cpumask(pool, cpu);                mutex_unlock(&pool->attach_mutex);          }            /* update NUMA affinity of unbound workqueues */          list_for_each_entry(wq, &workqueues, list)              wq_update_unbound_numa(wq, cpu, true);            mutex_unlock(&wq_pool_mutex);          break;      }      return NOTIFY_OK;  }    static int workqueue_cpu_down_callback(struct notifier_block *nfb,                           unsigned long action,                           void *hcpu)  {      int cpu = (unsigned long)hcpu;      struct work_struct unbind_work;      struct workqueue_struct *wq;        switch (action & ~CPU_TASKS_FROZEN) {      case CPU_DOWN_PREPARE:          /* unbinding per-cpu workers should happen on the local CPU */          INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);          queue_work_on(cpu, system_highpri_wq, &unbind_work);            /* update NUMA affinity of unbound workqueues */          mutex_lock(&wq_pool_mutex);          list_for_each_entry(wq, &workqueues, list)              wq_update_unbound_numa(wq, cpu, false);          mutex_unlock(&wq_pool_mutex);            /* wait for per-cpu unbinding to finish */          flush_work(&unbind_work);          destroy_work_on_stack(&unbind_work);          break;      }      return NOTIFY_OK;  }

init_worker_pool()初始化一個worker_pool。

static int init_worker_pool(struct worker_pool *pool)  {      spin_lock_init(&pool->lock);      pool->id = -1;      pool->cpu = -1;---------------------------------------------初始值-1表示當前worker_pool是unbound型的      pool->node = NUMA_NO_NODE;      pool->flags |= POOL_DISASSOCIATED;      INIT_LIST_HEAD(&pool->worklist);      INIT_LIST_HEAD(&pool->idle_list);      hash_init(pool->busy_hash);        init_timer_deferrable(&pool->idle_timer);      pool->idle_timer.function = idle_worker_timeout;-------------銷毀多餘worker,每IDLE_WORKER_TIMEOUT(300秒)執行一次。      pool->idle_timer.data = (unsigned long)pool;        setup_timer(&pool->mayday_timer, pool_mayday_timeout,              (unsigned long)pool);--------------------------------設置mayday_timer,周期為MAYDAY_INTERVAL,即100ms。        mutex_init(&pool->manager_arb);      mutex_init(&pool->attach_mutex);      INIT_LIST_HEAD(&pool->workers);        ida_init(&pool->worker_ida);      INIT_HLIST_NODE(&pool->hash_node);      pool->refcnt = 1;        /* shouldn't fail above this point */      pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);      if (!pool->attrs)          return -ENOMEM;      return 0;  }

create_worker()創建內核的工作執行緒。

static struct worker *create_worker(struct worker_pool *pool)  {      struct worker *worker = NULL;      int id = -1;      char id_buf[16];        /* ID is needed to determine kthread name */      id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);----------------從當前worker_pool->worker_ida獲取一個空閑id。      if (id < 0)          goto fail;        worker = alloc_worker(pool->node);---------------------------------------分配一個woker結構體      if (!worker)          goto fail;        worker->pool = pool;-----------------------------------------------------woker_pool關聯到worker      worker->id = id;---------------------------------------------------------遞增的id        if (pool->cpu >= 0)------------------------------------------------------初始值為-1表示unbound,當>=0的時候就指定了cpu,說明是bound型的。          snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,               pool->attrs->nice < 0  ? "H" : "");-----------------------------nice為0表示普通優先順序,nice為-20是高優先順序。      else          snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);        worker->task = kthread_create_on_node(worker_thread, worker, pool->node,                            "kworker/%s", id_buf);----------------------------woker和創建的內核工作執行緒關聯上,執行緒處理函數是worker_thread。      if (IS_ERR(worker->task))          goto fail;        set_user_nice(worker->task, pool->attrs->nice);-------------------------設置內核工作執行緒的優先順序相關        /* prevent userland from meddling with cpumask of workqueue workers */      worker->task->flags |= PF_NO_SETAFFINITY;-------------------------------阻止用戶修改其CPU親和性        /* successful, attach the worker to the pool */      worker_attach_to_pool(worker, pool);------------------------------------將worker附著到worker_pool上        /* start the newly created worker */      spin_lock_irq(&pool->lock);      worker->pool->nr_workers++;---------------------------------------------統計當前worker對應worker_pool中工作執行緒數目      worker_enter_idle(worker);----------------------------------------------讓該工作執行緒進入idle狀態。      wake_up_process(worker->task);------------------------------------------喚醒剛創建的工作執行緒      spin_unlock_irq(&pool->lock);        return worker;    fail:      if (id >= 0)          ida_simple_remove(&pool->worker_ida, id);      kfree(worker);      return NULL;  }

woker_attact_to_pool()主要是將worker工作執行緒加入到woker_pool->workers鏈表中。

static void worker_attach_to_pool(struct worker *worker,                     struct worker_pool *pool)  {      mutex_lock(&pool->attach_mutex);        set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);      if (pool->flags & POOL_DISASSOCIATED)------------------表示worker_pool沒有綁定到某個CPU上,所以worker也不會綁定到某個CPU。          worker->flags |= WORKER_UNBOUND;        list_add_tail(&worker->node, &pool->workers);----------將當前worker加入到worker_pool末尾。        mutex_unlock(&pool->attach_mutex);  }

1.2.3 工作執行緒執行函數

worker_thread()是工作執行緒的處理函數,不管其所在worker_pool是bound還是unbound型。

worker_thread()處理了大部分work_item,除了屬於rescuer的work_item由rescuer_thread()進行處理。

通過worker找到對應的worker_pool,然後遍歷worker_pool中的work_struct。

static int worker_thread(void *__worker)  {      struct worker *worker = __worker;      struct worker_pool *pool = worker->pool;        /* tell the scheduler that this is a workqueue worker */      worker->task->flags |= PF_WQ_WORKER;------------------PF_WQ_WORKER告訴調度器這是一個woker類型的執行緒。  woke_up:      spin_lock_irq(&pool->lock);        /* am I supposed to die? */      if (unlikely(worker->flags & WORKER_DIE)) {-----------WORKER_DIE表示此工作執行緒將要被銷毀。          spin_unlock_irq(&pool->lock);          WARN_ON_ONCE(!list_empty(&worker->entry));          worker->task->flags &= ~PF_WQ_WORKER;            set_task_comm(worker->task, "kworker/dying");          ida_simple_remove(&pool->worker_ida, worker->id);          worker_detach_from_pool(worker, pool);          kfree(worker);          return 0;      }        worker_leave_idle(worker);----------------------------清除WORKER_IDLE標誌位,並退出idle狀態鏈表  recheck:      /* no more worker necessary? */      if (!need_more_worker(pool))--------------------------如果當前worker_pool->worklist中沒有pending任務,並且當前pool中沒有正在運行的執行緒,need_more_worker()返回true。          goto sleep;        /* do we need to manage? */      if (unlikely(!may_start_working(pool)) && manage_workers(worker))------may_start_working()判斷pool中是否有idle狀態工作執行緒。如果沒有,那麼manage_workers()創建一些工作執行緒。          goto recheck;------------------------------------------------------manage_worker()創建新工作執行緒之後,還需要跳轉到recheck標籤處再檢查一遍,有可能在創建工作執行緒過程中整個執行緒池發生了變化。        /*       * ->scheduled list can only be filled while a worker is       * preparing to process a work or actually processing it.       * Make sure nobody diddled with it while I was sleeping.       */      WARN_ON_ONCE(!list_empty(&worker->scheduled));-------------------------scheduled鏈表表示工作執行緒準備處理一個work或者正在執行一個work時才會有work添加到該鏈表中。        /*       * Finish PREP stage.  We're guaranteed to have at least one idle       * worker or that someone else has already assumed the manager       * role.  This is where @worker starts participating in concurrency       * management if applicable and concurrency management is restored       * after being rebound.  See rebind_workers() for details.       */      worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);--------因為馬上就要開始執行work的回調函數了,對於bound類型增加worker_pool->nr_running計數        do {-----------------------------------------------------------遍歷當前worker_pool->worklist中的工作,調用process_one_work()進行處理。          struct work_struct *work =              list_first_entry(&pool->worklist,                       struct work_struct, entry);            if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {              /* optimization path, not strictly necessary */              process_one_work(worker, work);------------------------單獨處理一個work              if (unlikely(!list_empty(&worker->scheduled)))                  process_scheduled_works(worker);-------------------處理worker_pool->scheduled鏈表上的work_struct。          } else {---------------------------------------------------如果當前work_struct置位WORK_STRUCT_LINKED表示work後面還串上其它work,把這些work遷移到woeker_pool->scheduled中,然後一併再用process_one_work()函數處理。              move_linked_works(work, &worker->scheduled, NULL);              process_scheduled_works(worker);          }      } while (keep_working(pool));----------------------------------判斷當前worker_pool->worklist不為空,且工作執行緒池活躍執行緒小於等於1,那麼保持當前工作執行緒繼續工作,以防止工作執行緒泛濫。        worker_set_flags(worker, WORKER_PREP);  sleep:      /*       * pool->lock is held and there's no work to process and no need to       * manage, sleep.  Workers are woken up only while holding       * pool->lock or from local cpu, so setting the current state       * before releasing pool->lock is enough to prevent losing any       * event.       */      worker_enter_idle(worker);      __set_current_state(TASK_INTERRUPTIBLE);      spin_unlock_irq(&pool->lock);      schedule();      goto woke_up;  }

manage_workers()函數動態管理創建工作執行緒的函數。

maybo_create_worker()函數中while首先調用create_worker()來創建新的工作執行緒。

static bool manage_workers(struct worker *worker)  {      struct worker_pool *pool = worker->pool;        if (pool->flags & POOL_MANAGER_ACTIVE)          return false;        pool->flags |= POOL_MANAGER_ACTIVE;      pool->manager = worker;        maybe_create_worker(pool);        pool->manager = NULL;      pool->flags &= ~POOL_MANAGER_ACTIVE;      wake_up(&wq_manager_wait);      return true;  }    static void maybe_create_worker(struct worker_pool *pool)  __releases(&pool->lock)  __acquires(&pool->lock)  {  restart:      spin_unlock_irq(&pool->lock);        /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */      mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);        while (true) {          if (create_worker(pool) || !need_to_create_worker(pool))-----------------create_worker()創建成功則退出while循環;或者通過need_to_create_worker()判斷是否需要繼續創建新執行緒。              break;            schedule_timeout_interruptible(CREATE_COOLDOWN);            if (!need_to_create_worker(pool))----------------------------------------再次判斷是否需要繼續創建新執行緒。              break;      }        del_timer_sync(&pool->mayday_timer);      spin_lock_irq(&pool->lock);      /*       * This is necessary even after a new worker was just successfully       * created as @pool->lock was dropped and the new worker might have       * already become busy.       */      if (need_to_create_worker(pool))          goto restart;  }

process_scheduled_works()專門處理worker->scheduled上面的工作,具體處理還是交給process_one_work()。

static void process_scheduled_works(struct worker *worker)  {      while (!list_empty(&worker->scheduled)) {          struct work_struct *work = list_first_entry(&worker->scheduled,                          struct work_struct, entry);          process_one_work(worker, work);      }  }      static void process_one_work(struct worker *worker, struct work_struct *work)  __releases(&pool->lock)  __acquires(&pool->lock)  {      struct pool_workqueue *pwq = get_work_pwq(work);      struct worker_pool *pool = worker->pool;      bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;----------------判斷當前的workqueue是否是CPU_INTENSIVE,會對其所在工作執行緒進行特殊設置。      int work_color;      struct worker *collision;  #ifdef CONFIG_LOCKDEP      /*       * It is permissible to free the struct work_struct from       * inside the function that is called from it, this we need to       * take into account for lockdep too.  To avoid bogus "held       * lock freed" warnings as well as problems when looking into       * work->lockdep_map, make a copy and use that here.       */      struct lockdep_map lockdep_map;        lockdep_copy_map(&lockdep_map, &work->lockdep_map);  #endif      /* ensure we're on the correct CPU */      WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&               raw_smp_processor_id() != pool->cpu);        /*       * A single work shouldn't be executed concurrently by       * multiple workers on a single cpu.  Check whether anyone is       * already processing the work.  If so, defer the work to the       * currently executing one.       */      collision = find_worker_executing_work(pool, work);--------------------查詢當前work是否在worker_pool->busy_hash表中正在運行,如果在就移到當前work正在執行的worker->scheduled並退出當前處理。      if (unlikely(collision)) {          move_linked_works(work, &collision->scheduled, NULL);          return;      }        /* claim and dequeue */      debug_work_deactivate(work);      hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);      worker->current_work = work;      worker->current_func = work->func;      worker->current_pwq = pwq;      work_color = get_work_color(work);        list_del_init(&work->entry);        /*       * CPU intensive works don't participate in concurrency management.       * They're the scheduler's responsibility.  This takes @worker out       * of concurrency management and the next code block will chain       * execution of the pending work items.       */      if (unlikely(cpu_intensive))          worker_set_flags(worker, WORKER_CPU_INTENSIVE);--------------------設置當前工作執行緒flags,調度器就知道內核執行緒屬性了,但實際上調度器暫時並沒有做特殊處理。        /*       * Wake up another worker if necessary.  The condition is always       * false for normal per-cpu workers since nr_running would always       * be >= 1 at this point.  This is used to chain execution of the       * pending work items for WORKER_NOT_RUNNING workers such as the       * UNBOUND and CPU_INTENSIVE ones.       */      if (need_more_worker(pool))-----------------------判斷是否需要喚醒更多工作執行緒,wake_up_worker()去喚醒worker_pool中第一個idle執行緒。對於bound型worker_pool此時一般nr_running>=1,所以條件不成立。          wake_up_worker(pool);        /*       * Record the last pool and clear PENDING which should be the last       * update to @work.  Also, do this inside @pool->lock so that       * PENDING and queued state changes happen together while IRQ is       * disabled.       */      set_work_pool_and_clear_pending(work, pool->id);---------------清除struct worker中data成員pending標誌位,裡面使用了smp_wmb保證了pending之前的寫操作完成之後才清除pending。        spin_unlock_irq(&pool->lock);        lock_map_acquire_read(&pwq->wq->lockdep_map);      lock_map_acquire(&lockdep_map);      trace_workqueue_execute_start(work);      worker->current_func(work);------------------------------------真正執行work的回調函數      /*       * While we must be careful to not use "work" after this, the trace       * point will only record its address.       */      trace_workqueue_execute_end(work);      lock_map_release(&lockdep_map);      lock_map_release(&pwq->wq->lockdep_map);        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {          pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%dn"                 "     last function: %pfn",                 current->comm, preempt_count(), task_pid_nr(current),                 worker->current_func);          debug_show_held_locks(current);          dump_stack();      }        /*       * The following prevents a kworker from hogging CPU on !PREEMPT       * kernels, where a requeueing work item waiting for something to       * happen could deadlock with stop_machine as such work item could       * indefinitely requeue itself while all other CPUs are trapped in       * stop_machine. At the same time, report a quiescent RCU state so       * the same condition doesn't freeze RCU.       */      cond_resched_rcu_qs();        spin_lock_irq(&pool->lock);        /* clear cpu intensive status */      if (unlikely(cpu_intensive))          worker_clr_flags(worker, WORKER_CPU_INTENSIVE);        /* we're done with it, release */      hash_del(&worker->hentry);-----------------------------------work回調函數執行完成後的清理工作      worker->current_work = NULL;      worker->current_func = NULL;      worker->current_pwq = NULL;      worker->desc_valid = false;      pwq_dec_nr_in_flight(pwq, work_color);  }

2 創建工作隊列

2.1 各種創建工作隊列API和flags

創建工作隊列的API有很多,但最終都通過__alloc_workqueue_key()去實現。不同API之間的主要區別在於使用了不同的flag。

所以看一下這些flag,同時max_active決定每個CPU最多可有多少個work掛入一個工作隊列。

如果bound類型工作隊列,max_active最大可以是512;如果max_active為0,表示指定為256。

如果需要嚴格串列執行工作隊列,使用max_active=1和WQ_UNBOUND組合。

/*   * Workqueue flags and constants.  For details, please refer to   * Documentation/workqueue.txt.   */  enum {      WQ_NON_REENTRANT    = 1 << 0, /* guarantee non-reentrance */-----------確保工作在多個CPU上是不可重入的。      WQ_UNBOUND        = 1 << 1, /* not bound to any cpu */-----------------工作任務會加入unbound工作隊列中,unbound類型work不需要額外的同步管理,unbound工作執行緒池會嘗試儘快執行它的work。      WQ_FREEZABLE        = 1 << 2, /* freeze during suspend */--------------此標記工作隊列會參與到系統suspend過程中,會讓工作執行緒處理完成所有的work才完成進程凍結,並且這個過程不會再新開始一個work執行,直到進程被解凍。      WQ_MEM_RECLAIM        = 1 << 3, /* may be used for memory reclaim */---當記憶體緊張時,創建新的工作執行緒可能會失敗,系統還有一個recuer內核執行緒會去接管這種情況。      WQ_HIGHPRI        = 1 << 4, /* high priority */------------------------工作隊列的任務對應高優先順序的worker_pool,即較低nice值。      WQ_CPU_INTENSIVE    = 1 << 5, /* cpu instensive workqueue */-----------屬於特別消耗CPU資源一類work,這類work會得到系統進程調度器的監管,排在這類work後面的non-cpu intensive類型work可能會推遲執行。

最常見的形式是alloc_workqueue(),其它都是對某些flag的封裝。

#define alloc_workqueue(fmt, flags, max_active, args...)              __alloc_workqueue_key((fmt), (flags), (max_active),                            NULL, NULL, ##args)  #define alloc_ordered_workqueue(fmt, flags, args...)                  alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)    #define create_workqueue(name)                              alloc_workqueue("%s", WQ_MEM_RECLAIM, 1, (name))  #define create_freezable_workqueue(name)                      alloc_workqueue("%s", WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM,               1, (name))  #define create_singlethread_workqueue(name)                      alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)

2.2 __alloc_workqueue_key()

__alloc_workqueue_key分配一個workqueue_struct數據結構並進行初始化,和pool_workqueue進行關聯等操作。

struct workqueue_struct *__alloc_workqueue_key(const char *fmt,                             unsigned int flags,                             int max_active,                             struct lock_class_key *key,                             const char *lock_name, ...)  {      size_t tbl_size = 0;      va_list args;      struct workqueue_struct *wq;      struct pool_workqueue *pwq;        /* see the comment above the definition of WQ_POWER_EFFICIENT */      if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)----------設置unbound類型workqueue後,究竟選擇哪個cpu上喚醒交由進程調度器決定。如果是bound類型就會讓idle狀態的CPU從idle狀態喚醒,從而增加了功耗。          flags |= WQ_UNBOUND;        /* allocate wq and format name */      if (flags & WQ_UNBOUND)          tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);        wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);      if (!wq)          return NULL;        if (flags & WQ_UNBOUND) {          wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);          if (!wq->unbound_attrs)              goto err_free_wq;      }        va_start(args, lock_name);      vsnprintf(wq->name, sizeof(wq->name), fmt, args);      va_end(args);        max_active = max_active ?: WQ_DFL_ACTIVE;      max_active = wq_clamp_max_active(max_active, flags, wq->name);        /* init wq */      wq->flags = flags;      wq->saved_max_active = max_active;      mutex_init(&wq->mutex);      atomic_set(&wq->nr_pwqs_to_flush, 0);      INIT_LIST_HEAD(&wq->pwqs);      INIT_LIST_HEAD(&wq->flusher_queue);      INIT_LIST_HEAD(&wq->flusher_overflow);      INIT_LIST_HEAD(&wq->maydays);        lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);      INIT_LIST_HEAD(&wq->list);        if (alloc_and_link_pwqs(wq) < 0)---------------------分配一個workqueue_struct數據結構並初始化          goto err_free_wq;        /*       * Workqueues which may be used during memory reclaim should       * have a rescuer to guarantee forward progress.       */      if (flags & WQ_MEM_RECLAIM) {          struct worker *rescuer;            rescuer = alloc_worker(NUMA_NO_NODE);          if (!rescuer)              goto err_destroy;            rescuer->rescue_wq = wq;          rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",                             wq->name);          if (IS_ERR(rescuer->task)) {              kfree(rescuer);              goto err_destroy;          }            wq->rescuer = rescuer;          rescuer->task->flags |= PF_NO_SETAFFINITY;          wake_up_process(rescuer->task);      }        if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))          goto err_destroy;        /*       * wq_pool_mutex protects global freeze state and workqueues list.       * Grab it, adjust max_active and add the new @wq to workqueues       * list.       */      mutex_lock(&wq_pool_mutex);        mutex_lock(&wq->mutex);      for_each_pwq(pwq, wq)          pwq_adjust_max_active(pwq);      mutex_unlock(&wq->mutex);        list_add(&wq->list, &workqueues);        mutex_unlock(&wq_pool_mutex);        return wq;    err_free_wq:      free_workqueue_attrs(wq->unbound_attrs);      kfree(wq);      return NULL;  err_destroy:      destroy_workqueue(wq);      return NULL;  }
static int alloc_and_link_pwqs(struct workqueue_struct *wq)  {      bool highpri = wq->flags & WQ_HIGHPRI;      int cpu, ret;        if (!(wq->flags & WQ_UNBOUND)) {------------------------處理bound類型workqueue          wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);-cpu_pwqs是一個per-cpu類型,為每個cpu分配一個pool_workqueue數據結構,是動態分配的。cpu_worker_pools是靜態定義的per-cpu類型worker_pool數據結構。          if (!wq->cpu_pwqs)              return -ENOMEM;            for_each_possible_cpu(cpu) {              struct pool_workqueue *pwq =                  per_cpu_ptr(wq->cpu_pwqs, cpu);              struct worker_pool *cpu_pools =                  per_cpu(cpu_worker_pools, cpu);                init_pwq(pwq, wq, &cpu_pools[highpri]);--------init_pwq()將動態分配的cpu_pwqs和靜態定義的cpu_worker_pools關聯起來。                mutex_lock(&wq->mutex);              link_pwq(pwq);---------------------------------把pool_workqueue添加到workqueue_struct->pwqs鏈表中。              mutex_unlock(&wq->mutex);          }          return 0;      } else if (wq->flags & __WQ_ORDERED) {-----------------處理ordered類型workqueue          ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);          /* there should only be single pwq for ordering guarantee */          WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||                    wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),               "ordering guarantee broken for workqueue %sn", wq->name);          return ret;      } else {-----------------------------------------------處理unbound類型workqueue          return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);      }  }
int apply_workqueue_attrs(struct workqueue_struct *wq,                const struct workqueue_attrs *attrs)  {      struct workqueue_attrs *new_attrs, *tmp_attrs;      struct pool_workqueue **pwq_tbl, *dfl_pwq;      int node, ret;        /* only unbound workqueues can change attributes */      if (WARN_ON(!(wq->flags & WQ_UNBOUND)))          return -EINVAL;        /* creating multiple pwqs breaks ordering guarantee */      if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))          return -EINVAL;        pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);      new_attrs = alloc_workqueue_attrs(GFP_KERNEL);      tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);      if (!pwq_tbl || !new_attrs || !tmp_attrs)          goto enomem;        /* make a copy of @attrs and sanitize it */      copy_workqueue_attrs(new_attrs, attrs);      cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);        /*       * We may create multiple pwqs with differing cpumasks.  Make a       * copy of @new_attrs which will be modified and used to obtain       * pools.       */      copy_workqueue_attrs(tmp_attrs, new_attrs);        /*       * CPUs should stay stable across pwq creations and installations.       * Pin CPUs, determine the target cpumask for each node and create       * pwqs accordingly.       */      get_online_cpus();        mutex_lock(&wq_pool_mutex);        /*       * If something goes wrong during CPU up/down, we'll fall back to       * the default pwq covering whole @attrs->cpumask.  Always create       * it even if we don't use it immediately.       */      dfl_pwq = alloc_unbound_pwq(wq, new_attrs);---------------------分配一個pool_workqueue數據結構      if (!dfl_pwq)          goto enomem_pwq;        for_each_node(node) {          if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {              pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);-------查找或者新建一個pool_workqueue              if (!pwq_tbl[node])                  goto enomem_pwq;          } else {              dfl_pwq->refcnt++;              pwq_tbl[node] = dfl_pwq;          }      }        mutex_unlock(&wq_pool_mutex);        /* all pwqs have been created successfully, let's install'em */      mutex_lock(&wq->mutex);        copy_workqueue_attrs(wq->unbound_attrs, new_attrs);        /* save the previous pwq and install the new one */      for_each_node(node)          pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);        /* @dfl_pwq might not have been used, ensure it's linked */      link_pwq(dfl_pwq);      swap(wq->dfl_pwq, dfl_pwq);        mutex_unlock(&wq->mutex);        /* put the old pwqs */      for_each_node(node)          put_pwq_unlocked(pwq_tbl[node]);      put_pwq_unlocked(dfl_pwq);        put_online_cpus();      ret = 0;      /* fall through */  out_free:      free_workqueue_attrs(tmp_attrs);      free_workqueue_attrs(new_attrs);      kfree(pwq_tbl);      return ret;    enomem_pwq:      free_unbound_pwq(dfl_pwq);      for_each_node(node)          if (pwq_tbl && pwq_tbl[node] != dfl_pwq)              free_unbound_pwq(pwq_tbl[node]);      mutex_unlock(&wq_pool_mutex);      put_online_cpus();  enomem:      ret = -ENOMEM;      goto out_free;  }    static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,                      const struct workqueue_attrs *attrs)  {      struct worker_pool *pool;      struct pool_workqueue *pwq;        lockdep_assert_held(&wq_pool_mutex);        pool = get_unbound_pool(attrs);----------------------------首先查找一個worker_pool,如果沒有則創建一個新的worker_pool。      if (!pool)          return NULL;        pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);-----分配一個pool_workqueue數據結構      if (!pwq) {          put_unbound_pool(pool);          return NULL;      }        init_pwq(pwq, wq, pool);      return pwq;  }    static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)  {      u32 hash = wqattrs_hash(attrs);      struct worker_pool *pool;      int node;        lockdep_assert_held(&wq_pool_mutex);        /* do we already have a matching pool? */      hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {------系統定義了一個哈希表unbound_pool_hash,用於管理所有的unbound類型worker_pool          if (wqattrs_equal(pool->attrs, attrs)) {----------------------------通過wqattrs_equal()判斷系統中是否已經有個類型相關worker_pool              pool->refcnt++;              return pool;          }      }        /* nope, create a new one */      pool = kzalloc(sizeof(*pool), GFP_KERNEL);------------------------------如果沒有找到,重新分配和初始化一個worker_pool      if (!pool || init_worker_pool(pool) < 0)          goto fail;  ...      /* create and start the initial worker */      if (!create_worker(pool))          goto fail;        /* install */      hash_add(unbound_pool_hash, &pool->hash_node, hash);        return pool;  ...  }    static void put_pwq(struct pool_workqueue *pwq)  {      lockdep_assert_held(&pwq->pool->lock);      if (likely(--pwq->refcnt))          return;      if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))          return;      schedule_work(&pwq->unbound_release_work);  }    static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,               struct worker_pool *pool)  {      BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);        memset(pwq, 0, sizeof(*pwq));        pwq->pool = pool;-------------------------------------------pwq->pool指向worker_pool      pwq->wq = wq;-----------------------------------------------pwq->wq指向workqueue_struct      pwq->flush_color = -1;      pwq->refcnt = 1;      INIT_LIST_HEAD(&pwq->delayed_works);      INIT_LIST_HEAD(&pwq->pwqs_node);      INIT_LIST_HEAD(&pwq->mayday_node);      INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);------------用於釋放pool_workqueue  }      static void pwq_unbound_release_workfn(struct work_struct *work)  {      struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,                            unbound_release_work);-----------從work找到pool_workqueue數據結構指針pwq      struct workqueue_struct *wq = pwq->wq;      struct worker_pool *pool = pwq->pool;      bool is_last;        if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))          return;        mutex_lock(&wq->mutex);      list_del_rcu(&pwq->pwqs_node);      is_last = list_empty(&wq->pwqs);      mutex_unlock(&wq->mutex);        mutex_lock(&wq_pool_mutex);      put_unbound_pool(pool);      mutex_unlock(&wq_pool_mutex);        call_rcu_sched(&pwq->rcu, rcu_free_pwq);        /*       * If we're the last pwq going away, @wq is already dead and no one       * is gonna access it anymore.  Free it.       */      if (is_last) {          free_workqueue_attrs(wq->unbound_attrs);          kfree(wq);      }  }

3. 調度一個work

一般情況下使用默認的workqueue,首先需要初始化一個work,然後使用schedule_work()把work掛入默認的workqueue中。

3.1 初始化一個work

初始化一個work的API有各種不同形式,但最終都調用__INIT_WORK()。


當data欄位包含WORK_STRUCT_PWQ_BIT標誌位時,高位存放上一次pool_workqueue指針,低8位存放標誌位;沒有包含時,包比特位存放上次worker_pool的ID號,低5位存放標誌位。

常見標誌位如下:

enum {      WORK_STRUCT_PENDING_BIT    = 0,    /* work item is pending execution */----表示該work正在pending執行。      WORK_STRUCT_DELAYED_BIT    = 1,    /* work item is delayed */--------------表示該work被延遲執行了。      WORK_STRUCT_PWQ_BIT    = 2,    /* data points to pwq */      WORK_STRUCT_LINKED_BIT    = 3,    /* next work is linked to this one */----表示一個work連接到該work上。  #ifdef CONFIG_DEBUG_OBJECTS_WORK      WORK_STRUCT_STATIC_BIT    = 4,    /* static initializer (debugobjects) */      WORK_STRUCT_COLOR_SHIFT    = 5,    /* color for workqueue flushing */  #else      WORK_STRUCT_COLOR_SHIFT    = 4,    /* color for workqueue flushing */  #endif      WORK_STRUCT_COLOR_BITS    = 4,  ...  }

3.2 schedule_work

在初始化完work之後,調用schedule_work()函數把work掛入系統默認workqueue中。

schedule_work()的默認的工作隊列是system_wq,最終將工作交給__queue_work()。

static inline bool schedule_work(struct work_struct *work)  {      return queue_work(system_wq, work);  }    static inline bool queue_work(struct workqueue_struct *wq,                    struct work_struct *work)  {      return queue_work_on(WORK_CPU_UNBOUND, wq, work);------------WORK_CPU_UNBOUND不是表示unbound類型,而是CPU。  }    bool queue_work_on(int cpu, struct workqueue_struct *wq,             struct work_struct *work)  {      bool ret = false;      unsigned long flags;        local_irq_save(flags);---------------------------------------把work加入工作隊列是在關本地中斷下運行的。        if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {------------設置WORK_STRUCT_PENDING_BIT並返回舊值。          __queue_work(cpu, wq, work);          ret = true;      }        local_irq_restore(flags);      return ret;  }    static void __queue_work(int cpu, struct workqueue_struct *wq,               struct work_struct *work)  {      struct pool_workqueue *pwq;      struct worker_pool *last_pool;      struct list_head *worklist;      unsigned int work_flags;      unsigned int req_cpu = cpu;        WARN_ON_ONCE(!irqs_disabled());----------------------------是否處於關中斷狀態        debug_work_activate(work);        /* if draining, only works from the same workqueue are allowed */      if (unlikely(wq->flags & __WQ_DRAINING) &&          WARN_ON_ONCE(!is_chained_work(wq)))--------------------__WQ_DRAINING表示要銷毀workqueue,那麼掛入workqueue中所有的work都要處理完畢才能把這個workqueue銷毀。在銷毀過程中,一般不允許再有新的work加入隊列中。有一種特殊例外是正在清空work時觸發了一個queue work操作,這種情況被稱為chained work。          return;  retry:      if (req_cpu == WORK_CPU_UNBOUND)          cpu = raw_smp_processor_id();        /* pwq which will be used unless @work is executing elsewhere */      if (!(wq->flags & WQ_UNBOUND))          pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);-----------------對於bound型的workqueue,直接使用本地CPU對應pool_workqueue。      else          pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));------對於unbound型,調用unbound_pwq_by_node()尋找本地node節點對應的unbound類型的pool_workqueue。        /*       * If @work was previously on a different pool, it might still be       * running there, in which case the work needs to be queued on that       * pool to guarantee non-reentrancy.       */      last_pool = get_work_pool(work);--------------------------通過work_struct的成員data查詢該work上一次是在哪個worker_pool中運行的。      if (last_pool && last_pool != pwq->pool) {----------------如果上次運行的worker_pool和本次不一致          struct worker *worker;            spin_lock(&last_pool->lock);            worker = find_worker_executing_work(last_pool, work);--判斷一個work是否正在last_pool上運行,也即不在當前worker_pool運行,如果是返回這個正在執行的工作執行緒worker            if (worker && worker->current_pwq->wq == wq) {              pwq = worker->current_pwq;-------------------------利用當前work正在執行的pool_workqueue,利用快取熱度,不進行調度。          } else {              /* meh... not running there, queue here */              spin_unlock(&last_pool->lock);              spin_lock(&pwq->pool->lock);          }      } else {          spin_lock(&pwq->pool->lock);      }        if (unlikely(!pwq->refcnt)) {          if (wq->flags & WQ_UNBOUND) {-------------------對unbound類型pool_workqueue釋放是非同步的,當refcnt減少到0時,說明該pool_workqueue已經被釋放,那麼需要跳轉到retry出重新選擇pool_workqueue。              spin_unlock(&pwq->pool->lock);              cpu_relax();              goto retry;          }          /* oops */          WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",                wq->name, cpu);      }        /* pwq determined, queue */      trace_workqueue_queue_work(req_cpu, pwq, work);        if (WARN_ON(!list_empty(&work->entry))) {          spin_unlock(&pwq->pool->lock);          return;      }        pwq->nr_in_flight[pwq->work_color]++;      work_flags = work_color_to_flags(pwq->work_color);        if (likely(pwq->nr_active < pwq->max_active)) {-------判斷當前pool_workqueue的work活躍數量,如果少於最高限值,就加入pending狀態鏈表worker_pool->worklist,否則加入delayed_works鏈表中。          trace_workqueue_activate_work(work);          pwq->nr_active++;          worklist = &pwq->pool->worklist;      } else {          work_flags |= WORK_STRUCT_DELAYED;          worklist = &pwq->delayed_works;      }        insert_work(pwq, work, worklist, work_flags);---------將當前work加入到pool_workqueue->worklist尾部。        spin_unlock(&pwq->pool->lock);  }

get_work_pool()通過work_struct找到該work上一次在哪個worker_pool中運行。

static struct worker_pool *get_work_pool(struct work_struct *work)  {      unsigned long data = atomic_long_read(&work->data);      int pool_id;        assert_rcu_or_pool_mutex();        if (data & WORK_STRUCT_PWQ)----------------------------如果定義了WORK_STRUCT_PWQ,那麼直接得到pool_workqueue地址,進而找到worker_pool。          return ((struct pool_workqueue *)              (data & WORK_STRUCT_WQ_DATA_MASK))->pool;        pool_id = data >> WORK_OFFQ_POOL_SHIFT;----------------如果沒定義WORK_STRUCT_PWQ,那麼可以得到對應的pool_id。      if (pool_id == WORK_OFFQ_POOL_NONE)          return NULL;        return idr_find(&worker_pool_idr, pool_id);------------根據pool_id從worker_pool_idr中找到對應的worker_pool。  }

insert_work()將work加入到worker_pool的列表中,

static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,              struct list_head *head, unsigned int extra_flags)  {      struct worker_pool *pool = pwq->pool;        /* we own @work, set data and link */      set_work_pwq(work, pwq, extra_flags);------------把pool_workqueue指針的值和一些flag設置到data成員中,方便下次調用queue_work()知道本次使用哪個pool_workqueue()。      list_add_tail(&work->entry, head);---------------將work加入到worker_pool->worklist尾部。      get_pwq(pwq);------------------------------------增加pool_workqueue->refcnt成員引用計數。        /*       * Ensure either wq_worker_sleeping() sees the above       * list_add_tail() or we see zero nr_running to avoid workers lying       * around lazily while there are works to be processed.       */      smp_mb();----------------------------------------保證wake_up_worker()喚醒worker時,在__schedule()->wq_worker_sleeping()時,這裡的list_add_tail()已經完成。同時保證下面__need_more_worker()讀取nr_running時list_add_tail()鏈表已經完成。        if (__need_more_worker(pool))--------------------如果當前nr_running為0,表示當前worker可能並沒有處於運行狀態。那麼需要wake_up_worker()強行喚醒一次。          wake_up_worker(pool);  }      static void wake_up_worker(struct worker_pool *pool)  {      struct worker *worker = first_idle_worker(pool);        if (likely(worker))          wake_up_process(worker->task);  }

調用schedule_work()只是把work加入到workqueue中,,但並沒有開始實質的調度工作。

  • 加入workqueue的pending鏈表是關中斷環境下進行的
  • 設置work->data成員的WORK_STRUCT_PENDING_BIT標誌位
  • 尋找合適的pool_workqueue,優先選擇本地CPU對應的pool_workqueue;如果該work正在另一個CPU工作執行緒池中運行,則優先選擇此工作執行緒池。
  • 找到pool_workqueue,就找到了對應的worker_pool和對應的pending鏈表

那麼work真正執行的地方在哪裡呢?參見worker_thread()。

其它基於system_wq的變種還包括如下系列,_on表示指定某個CPU,_delayed表示延時工作。

int schedule_work_on(int cpu, struct work_struct *work)  {      return queue_work_on(cpu, system_wq, work);  }    int schedule_delayed_work(struct delayed_work *dwork,                      unsigned long delay)  {      return queue_delayed_work(system_wq, dwork, delay);  }    int schedule_delayed_work_on(int cpu,              struct delayed_work *dwork, unsigned long delay)  {      return queue_delayed_work_on(cpu, system_wq, dwork, delay);  }

3.3 其它系統默認workqueue

上面介紹了schedule_work(),其默認將work放入system_wq上。

系統還有其它很多默認workqueue,這些workqueue也都是通過queue_work()將work放入其上。

下面介紹一些其它系統全局workqueue的使用。

system_highpri_wq 和system_wq的區別在於WQ_HIGHPRI,這些work對應的工作執行緒位於cpu_worker_pool[1]中。工作執行緒的nice為-20,要比system_wq對應的工作執行緒優先順序要高。

system_long_wq和system_wq類似,但是一般system_long_wq用於執行時間較長的work,而system_wq放執行較短的work。

這兩個workqueue沒有明顯的區別,更多的是靠使用者自覺。

system_nrt_wq相對於system_wq使用了WQ_NON_REENTRANT。默認情況下工作隊列只是確保在同一CPU不可重入,即工作在同一CPU上不會被多個工作執行緒並發執行,但容許在多個CPU上並發執行。

該標誌表明在多個CPU上也是不可重入的,工作將在不可重入workqueue上,並確保至多在一個系統範圍內的工作執行緒上執行。

system_unbound_wq相對於system_wq的區別是被設置為WQ_UNBOUND,沒有並發管理,且work最大活躍數不超過WQ_UNBOUND_MAX_ACTIVE,一般為WQ_MAX_ACTIVE=512。

system_unbound_wq對應的工作執行緒不會被綁定到特定CPU,所有排隊的work會被立即執行,只要資源足夠並且不超過最大活躍數。

system_freezable_wq 相對於system_wq多了WQ_FREEZABLE標誌,表示可以凍結workqueue參與系統的暫停操作,該workqueue的工作將被暫停,除非被喚醒,否者沒有新的work被執行。

system_power_efficient_wq相對於system_wq多了WQ_POWER_EFFICIENT標誌,將工作隊列表示為unbound已達到節省功耗的目的,並且還需要wq_power_efficient打開。否則和system_wq沒啥區別。

system_freezable_power_efficient_wq兼具system_freezable_wq的freezable和system_power_efficient_wq的power efficient兩個特性。

4. 取消一個work

取消一個work的介面是cancel_work_sync(),該函數通常會取消一個work,但會等待該work執行完畢。

bool cancel_work_sync(struct work_struct *work)  {      return __cancel_work_timer(work, false);  }      static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)  {      static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);---------------------等待隊列cancel_waitq      unsigned long flags;      int ret;        do {          ret = try_to_grab_pending(work, is_dwork, &flags);------------判斷當前work的狀態,需要特殊處理-ENOENT情況。            if (unlikely(ret == -ENOENT)) {              struct cwt_wait cwait;                init_wait(&cwait.wait);              cwait.wait.func = cwt_wakefn;              cwait.work = work;                prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,                            TASK_UNINTERRUPTIBLE);              if (work_is_canceling(work))                  schedule();              finish_wait(&cancel_waitq, &cwait.wait);          }      } while (unlikely(ret < 0));        /* tell other tasks trying to grab @work to back off */      mark_work_canceling(work);      local_irq_restore(flags);        flush_work(work);-------------------------------------------------會去等待work執行完成      clear_work_data(work);--------------------------------------------清除work標誌位        /*       * Paired with prepare_to_wait() above so that either       * waitqueue_active() is visible here or !work_is_canceling() is       * visible there.       */      smp_mb();      if (waitqueue_active(&cancel_waitq))          __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);        return ret;  }

try_to_grab_pending()判斷當前的work可否被取消,返回不同狀態。__cancel_work_timer()根據不同狀態採取不同操作。

static int try_to_grab_pending(struct work_struct *work, bool is_dwork,                     unsigned long *flags)  {      struct worker_pool *pool;      struct pool_workqueue *pwq;        local_irq_save(*flags);-----------------------------------------------關本地中斷,主要工作都在關中斷下進行。        /* try to steal the timer if it exists */      if (is_dwork) {          struct delayed_work *dwork = to_delayed_work(work);            if (likely(del_timer(&dwork->timer)))              return 1;      }        /* try to claim PENDING the normal way */      if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))----如果PENDING_BIT為0,說明該work處於idle狀態,那麼可以輕鬆的把work取出來。此處重新設置PENDING_BIT位,後續還需要等待該work執行完成。          return 0;        /*       * The queueing is in progress, or it is already queued. Try to       * steal it from ->worklist without clearing WORK_STRUCT_PENDING.       */----------------------------------------------------------------------下面的情況說明work正在被執行或者已經在worklist鏈表中,那麼嘗試去工作池中把work偷出來,成功後返回1.      pool = get_work_pool(work);      if (!pool)          goto fail;        spin_lock(&pool->lock);        pwq = get_work_pwq(work);      if (pwq && pwq->pool == pool) {          debug_work_deactivate(work);            if (*work_data_bits(work) & WORK_STRUCT_DELAYED)              pwq_activate_delayed_work(work);            list_del_init(&work->entry);-----------------------------------------將當前work從worker_pool->worklist中移除          pwq_dec_nr_in_flight(pwq, get_work_color(work));            /* work->data points to pwq iff queued, point to pool */          set_work_pool_and_keep_pending(work, pool->id);            spin_unlock(&pool->lock);          return 1;      }      spin_unlock(&pool->lock);  fail:      local_irq_restore(*flags);      if (work_is_canceling(work))--------------------------通過該work->data判斷該work正在被取消,返回-ENOENT。__cancel_work_timer()會睡眠等待並繼續完成。          return -ENOENT;      cpu_relax();      return -EAGAIN;---------------------------------------返回__cancel_work_timer()重試  }

flush_work()等待work執行完成,返回false表示當前work並沒有處於執行狀態;返回true表示等到work執行完成。

bool flush_work(struct work_struct *work)  {      struct wq_barrier barr;        lock_map_acquire(&work->lockdep_map);      lock_map_release(&work->lockdep_map);        if (start_flush_work(work, &barr)) {          wait_for_completion(&barr.done);          destroy_work_on_stack(&barr.work);          return true;      } else {          return false;      }  }    static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)  {      struct worker *worker = NULL;      struct worker_pool *pool;      struct pool_workqueue *pwq;        might_sleep();        local_irq_disable();      pool = get_work_pool(work);-----------------由work_struct找到worker_pool      if (!pool) {          local_irq_enable();          return false;      }        spin_lock(&pool->lock);      /* see the comment in try_to_grab_pending() with the same code */      pwq = get_work_pwq(work);-------------------由work_struct找到pool_workqueue      if (pwq) {          if (unlikely(pwq->pool != pool))--------表示當前work已經被執行完              goto already_gone;      } else {          worker = find_worker_executing_work(pool, work);-----------返回正在執行work的worker,如果沒有則返回NULL,表示已經被執行完畢。          if (!worker)              goto already_gone;          pwq = worker->current_pwq;      }        insert_wq_barrier(pwq, barr, work, worker);      spin_unlock_irq(&pool->lock);        /*       * If @max_active is 1 or rescuer is in use, flushing another work       * item on the same workqueue may lead to deadlock.  Make sure the       * flusher is not running on the same workqueue by verifying write       * access.       */      if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)          lock_map_acquire(&pwq->wq->lockdep_map);      else          lock_map_acquire_read(&pwq->wq->lockdep_map);      lock_map_release(&pwq->wq->lockdep_map);        return true;  already_gone:      spin_unlock_irq(&pool->lock);      return false;  }    static void insert_wq_barrier(struct pool_workqueue *pwq,                    struct wq_barrier *barr,                    struct work_struct *target, struct worker *worker)  {      struct list_head *head;      unsigned int linked = 0;        /*       * debugobject calls are safe here even with pool->lock locked       * as we know for sure that this will not trigger any of the       * checks and call back into the fixup functions where we       * might deadlock.       */      INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);-----------------初始化一個新的barr->work,執行函數是wq_barrier_func,裡面complete完成量barr->done。      __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));      init_completion(&barr->done);------------------------------------初始化barr->done完成量        /*       * If @target is currently being executed, schedule the       * barrier to the worker; otherwise, put it after @target.       */      if (worker)------------------------------------------------------當前work正在被執行,放在worker->scheduled.next之後          head = worker->scheduled.next;      else {          unsigned long *bits = work_data_bits(target);----------------否則放在target->entry.next            head = target->entry.next;          /* there can already be other linked works, inherit and set */          linked = *bits & WORK_STRUCT_LINKED;          __set_bit(WORK_STRUCT_LINKED_BIT, bits);      }        debug_work_activate(&barr->work);      insert_work(pwq, &barr->work, head,              work_color_to_flags(WORK_NO_COLOR) | linked);------------將barr->work加入到head後  }

關於PENDING_BIT何時被設置以及被清0:

  • 當一個work已經加入到workqueue隊列中,schedule_work()->queue_work()->queue_work_on()時被設置。
  • 當一個work在工作執行緒里馬上要執行,worker_thread()->process_on_work()->set_work_pool_and_clear_pend是清0。
  • 上述設置和清0都是在關閉本地中斷情況下執行的。

5. 和調度器的交互

假設某個work回調函數執行了睡眠操作,在wait_event_interruptible()中設置當前進程state為TASK_INTERRUPTIBLE,然後執行schedule()進行進程切換,調用軌跡是schedule()->__schedule()。

static void __sched __schedule(void)  {      struct task_struct *prev, *next;      unsigned long *switch_count;      struct rq *rq;      int cpu;        preempt_disable();      cpu = smp_processor_id();      rq = cpu_rq(cpu);      rcu_note_context_switch();      prev = rq->curr;------------------------------------------------prev指當前進程,即執行work的工作執行緒,state狀態為TASK_INTERRUPTIBLE。  ...      if (prev->state && !(preempt_count() & PREEMPT_ACTIVE)) {-------work回調函數中調度不是中斷返回前搶佔調度,preempt_count也沒有設置PREEMPT_ACTIVE。          if (unlikely(signal_pending_state(prev->state, prev))) {              prev->state = TASK_RUNNING;          } else {              deactivate_task(rq, prev, DEQUEUE_SLEEP);              prev->on_rq = 0;                /*               * If a worker went to sleep, notify and ask workqueue               * whether it wants to wake up a task to maintain               * concurrency.               */              if (prev->flags & PF_WQ_WORKER) {                  struct task_struct *to_wakeup;                    to_wakeup = wq_worker_sleeping(prev, cpu);---------當一個工作執行緒要被調度器換出時,調用wq_worker_sleeping()看看是否需要喚醒同一個執行緒池中的其它內核執行緒。                  if (to_wakeup)                      try_to_wake_up_local(to_wakeup);---------------去喚醒to_wakeup執行緒              }          }          switch_count = &prev->nvcsw;      }  ...  }

wq_worker_sleeping()檢查當前工作執行緒池中是否有內核執行緒正準備睡眠。如果有則返回task_struct,否則返回NULL。

在wq_worker_sleeping()返回不為NULL的情況下,調用try_to_wake_up_local()。

try_to_wake_up_local()是執行喚醒進程的操作。

struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)  {      struct worker *worker = kthread_data(task), *to_wakeup = NULL;      struct worker_pool *pool;        /*       * Rescuers, which may not have all the fields set up like normal       * workers, also reach here, let's not access anything before       * checking NOT_RUNNING.       */      if (worker->flags & WORKER_NOT_RUNNING)          return NULL;        pool = worker->pool;        /* this can only happen on the local cpu */      if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))          return NULL;        /*       * The counterpart of the following dec_and_test, implied mb,       * worklist not empty test sequence is in insert_work().       * Please read comment there.       *       * NOT_RUNNING is clear.  This means that we're bound to and       * running on the local cpu w/ rq lock held and preemption       * disabled, which in turn means that none else could be       * manipulating idle_list, so dereferencing idle_list without pool       * lock is safe.       */      if (atomic_dec_and_test(&pool->nr_running) &&          !list_empty(&pool->worklist))          to_wakeup = first_idle_worker(pool);-------------------從worker_pool->idle_list中找到第一個worker工作執行緒。      return to_wakeup ? to_wakeup->task : NULL;  }    static struct worker *first_idle_worker(struct worker_pool *pool)  {      if (unlikely(list_empty(&pool->idle_list)))          return NULL;        return list_first_entry(&pool->idle_list, struct worker, entry);  }      static void try_to_wake_up_local(struct task_struct *p)  {      struct rq *rq = task_rq(p);        if (WARN_ON_ONCE(rq != this_rq()) ||          WARN_ON_ONCE(p == current))          return;        lockdep_assert_held(&rq->lock);        if (!raw_spin_trylock(&p->pi_lock)) {          raw_spin_unlock(&rq->lock);          raw_spin_lock(&p->pi_lock);          raw_spin_lock(&rq->lock);      }        if (!(p->state & TASK_NORMAL))          goto out;        if (!task_on_rq_queued(p))          ttwu_activate(rq, p, ENQUEUE_WAKEUP);        ttwu_do_wakeup(rq, p, 0);------------------------設置進程轉改為TASK_RUNNING,並且調用sched_class->task_woken執行進程喚醒搶佔操作。      ttwu_stat(p, smp_processor_id(), 0);  out:      raw_spin_unlock(&p->pi_lock);  }      static void ttwu_activate(struct rq *rq, struct task_struct *p, int en_flags)  {      activate_task(rq, p, en_flags);      p->on_rq = TASK_ON_RQ_QUEUED;        /* if a worker is waking up, notify workqueue */      if (p->flags & PF_WQ_WORKER)          wq_worker_waking_up(p, cpu_of(rq));---------------增加nr_running技術,表示有一個工作執行緒馬上就會被喚醒。  }

6. 小結

聯繫方式:[email protected]