nginx1.17.9源码分析之线程池

  • 2020 年 3 月 31 日
  • 笔记

我们发现事件驱动的软件都得配一个线程池。libuv和nginx都是。因为事件驱动的软件是单线程。但是有些事情总会引起线程阻塞。所以这个事情就不能放到主线程里做。这就是为什么事件驱动都要配一个线程池。把任务交给线程池中的线程。主线程继续执行。任务完成后通知主线程或者执行回调就行。 我们先看一下nginx线程池的架构。然后开始分析。

在这里插入图片描述 线程池模块在nginx里属于核心模块。在nginx初始化的时候。会初始化一个保存线程池配置的结构体(见图)。nginx默认开启四个线程池。

static void *  ngx_thread_pool_create_conf(ngx_cycle_t *cycle)  {      ngx_thread_pool_conf_t  *tcf;        tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));      if (tcf == NULL) {          return NULL;      }        if (ngx_array_init(&tcf->pools, cycle->pool, 4,                         sizeof(ngx_thread_pool_t *))          != NGX_OK)      {          return NULL;      }        return tcf;  }  

上面的函数就是构造出文章开头的那个图的结构。创建了保存配置的结构,nginx开始解析指令。在分析解析指令前,我们先看一下几个工具函数。

// 根据名字查找池子  ngx_thread_pool_t *  ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)  {      ngx_uint_t                i;      ngx_thread_pool_t       **tpp;      ngx_thread_pool_conf_t   *tcf;        tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,                                                    ngx_thread_pool_module);        tpp = tcf->pools.elts;        for (i = 0; i < tcf->pools.nelts; i++) {            if (tpp[i]->name.len == name->len              && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)          {              return tpp[i];          }      }        return NULL;  }  

nginx每个线程池都有一个名字,这个函数就是从图里面的数组中找到名字对应的线程池。

ngx_thread_pool_t *  ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)  {      ngx_thread_pool_t       *tp, **tpp;      ngx_thread_pool_conf_t  *tcf;      // 没有名字则取默认值      if (name == NULL) {          name = &ngx_thread_pool_default;      }      // 已存在直接返回      tp = ngx_thread_pool_get(cf->cycle, name);        if (tp) {          return tp;      }      // 分配一个新的池子      tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));      if (tp == NULL) {          return NULL;      }        tp->name = *name;      tp->file = cf->conf_file->file.name.data;      tp->line = cf->conf_file->line;      // 拿到一开始时创建的,用于保存配置的结构体      tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,                                                    ngx_thread_pool_module);      // push进数组,数组会自动扩容      tpp = ngx_array_push(&tcf->pools);      if (tpp == NULL) {          return NULL;      }        *tpp = tp;        return tp;  }  

上面的函数就是往数组中追加一个元素(表示线程池的结构体)。如果已经存在则报错。 我们看一下,nginx如何解析指令的。配置线程池的指令是

thread_pool name threads=number [max_queue=number]  

解析到这个指令的时候,nginx会执行ngx_thread_pool。

static char *  ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)  {      ngx_str_t          *value;      ngx_uint_t          i;      ngx_thread_pool_t  *tp;      // thread_pool指令后的参数      value = cf->args->elts;      // 根据名字(没有则取默认名字)新建一个结构体      tp = ngx_thread_pool_add(cf, &value[1]);        // threads有值说明之前已经配置过这个名字      if (tp->threads) {          ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,                             "duplicate thread pool "%V"", &tp->name);          return NGX_CONF_ERROR;      }        tp->max_queue = 65536;      // 解析剩下的参数      for (i = 2; i < cf->args->nelts; i++) {            if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {              // 设置线程数              tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);              continue;          }            if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {              // 设置任务个数上限              tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);              continue;          }      }      // 等于0说明指令里没有配置threads参数,报错max_queue可以不配      if (tp->threads == 0) {          ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,                             ""%V" must have "threads" parameter",                             &cmd->name);          return NGX_CONF_ERROR;      }        return NGX_CONF_OK;  }  

上面的代码主要构造文章开始那个图中的结构。根据nginx的流程 1 创建保存配置的结构 2 解析配置 3 校验和补偿处理配置 解析完配置后,nginx接着校验和补偿处理。

// 处理完用户的配置后,可能需要做补偿处理  static char *  ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)  {      ngx_thread_pool_conf_t *tcf = conf;        ngx_uint_t           i;      ngx_thread_pool_t  **tpp;        tpp = tcf->pools.elts;      // 图中那个数组      for (i = 0; i < tcf->pools.nelts; i++) {          // 用户已经配置了线程数          if (tpp[i]->threads) {              continue;          }          // 没有配置线程数,但是取了默认名字,则其他信息也设置为默认值          if (              tpp[i]->name.len == ngx_thread_pool_default.len              &&              ngx_strncmp(               tpp[i]->name.data,               ngx_thread_pool_default.data,               ngx_thread_pool_default.len)               == 0              )          {              tpp[i]->threads = 32;              tpp[i]->max_queue = 65536;              continue;          }          // 配置了名字但是没有配置线程数,报错          ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,                        "unknown thread pool "%V" in %s:%ui",                        &tpp[i]->name, tpp[i]->file, tpp[i]->line);            return NGX_CONF_ERROR;      }        return NGX_CONF_OK;  }  

到此,关于线程池的数据结构已经处理完毕。接下就是创建线程和初始化线程池的数据了。在每个worker初始化的时候,会根据线程池的配置,创建对应的线程。

static ngx_int_t  ngx_thread_pool_init_worker(ngx_cycle_t *cycle)  {      ngx_uint_t                i;      ngx_thread_pool_t       **tpp;      ngx_thread_pool_conf_t   *tcf;      // 线程池只用于worker进程      if (ngx_process != NGX_PROCESS_WORKER          && ngx_process != NGX_PROCESS_SINGLE)      {          return NGX_OK;      }        tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,                                                    ngx_thread_pool_module);        if (tcf == NULL) {          return NGX_OK;      }      // 初始化队列(已完成的任务)      ngx_thread_pool_queue_init(&ngx_thread_pool_done);      // 线程池结构体数组      tpp = tcf->pools.elts;      // 每个worker启动一个或多个线程池      for (i = 0; i < tcf->pools.nelts; i++) {          if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {              return NGX_ERROR;          }      }        return NGX_OK;  }  

上面的代码遍历线程池结构体数组。针对每一个线程池结构体创建多个线程。

static ngx_int_t  ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)  {      int             err;      pthread_t       tid;      ngx_uint_t      n;      pthread_attr_t  attr;        ngx_thread_pool_queue_init(&tp->queue);      // 初始化互斥变量      if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {          return NGX_ERROR;      }      // 初始化条件变量      if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {          (void) ngx_thread_mutex_destroy(&tp->mtx, log);          return NGX_ERROR;      }        tp->log = log;      // 初始化线程属性      err = pthread_attr_init(&attr);        // 设置状态为分离,线程退出时资源马上被回收,不需要等待父线程回收      err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);        // 创建n个线程,工作函数是ngx_thread_pool_cycle,入参是tp      for (n = 0; n < tp->threads; n++) {          err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);      }      // 用完销毁      (void) pthread_attr_destroy(&attr);        return NGX_OK;  }  

这时候,多个线程就被创建了。然后每个线程执行自己的工作函数。

// 处理任务  static void *  ngx_thread_pool_cycle(void *data)  {      ngx_thread_pool_t *tp = data;        int                 err;      sigset_t            set;      ngx_thread_task_t  *task;        // 全置1      sigfillset(&set);      // 下面几个信号清零      sigdelset(&set, SIGILL);      sigdelset(&set, SIGFPE);      sigdelset(&set, SIGSEGV);      sigdelset(&set, SIGBUS);      // 屏蔽除了上面几个之外的信号      err = pthread_sigmask(SIG_BLOCK, &set, NULL);        for ( ;; ) {          // 加锁访问队列          if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {              return NULL;          }            // 摘下一个任务          tp->waiting--;            while (tp->queue.first == NULL) {              // 没有任务,等待条件满足时被唤醒              if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)                  != NGX_OK)              {                  (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);                  return NULL;              }          }          // 摘下第一个任务          task = tp->queue.first;          // 更新头指针          tp->queue.first = task->next;          // 没有任务了,更新尾指针指向头指针的地址,回到初始化状态          if (tp->queue.first == NULL) {              tp->queue.last = &tp->queue.first;          }          // 摘完节点,解锁          if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {              return NULL;          }          // 执行任务          task->handler(task->ctx, tp->log);            task->next = NULL;            ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);          // 执行完插入done队列尾部,done队列是所有线程池公用的,任务队列是每个线程池私有的          *ngx_thread_pool_done.last = task;          // 指向最后一个节点的next域的地址          ngx_thread_pool_done.last = &task->next;            ngx_memory_barrier();            ngx_unlock(&ngx_thread_pool_done_lock);          // 有任务完成,发通知          (void) ngx_notify(ngx_thread_pool_handler);      }  }  

线程池维护了一个任务队列,池中的线程互斥访问队列,从中摘下任务执行。任务执行完后把已完成的任务放到完成队列中(所有线程池共享)。并且通知负责处理完成任务节点的函数。

static void  ngx_thread_pool_handler(ngx_event_t *ev)  {      ngx_event_t        *event;      ngx_thread_task_t  *task;        // 加锁访问队列      ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);      // 指向整个done队列的节点,先保存下来,而不是直接遍历first指针,否则回调里一直加任务导致死循环      task = ngx_thread_pool_done.first;      // 重置头尾指针      ngx_thread_pool_done.first = NULL;      ngx_thread_pool_done.last = &ngx_thread_pool_done.first;        ngx_memory_barrier();        ngx_unlock(&ngx_thread_pool_done_lock);        while (task) {            event = &task->event;          // 指向下一个节点          task = task->next;          // 设置完成标记          event->complete = 1;          event->active = 0;            event->handler(event);      }  }  

这就是nginx线程池的原理。和大部分的线程池实现类似,代码看起来很多,但是逻辑还是比较清晰的。