brpc的bthread解读

一、Bthread的简单使用

std::vector<bthread_t> bids;  for (int i = 0; i < FLAGS_thread_num; ++i) {      if (bthread_start_background(&bids[i], NULL, myfunc, &myarg) != 0) {          LOG(ERROR) << "Fail to create bthread";          return -1;       }  }    for (int i = 0; i < FLAGS_thread_num; ++i) {       bthread_join(bids[i], NULL);  }
  • bthread_start_background 相当于pthread_create,此外还有bthread_start_urgent,这个urgent函数用于代码更需要紧急执行的场景,他会把旧的线程置入调度队列里。
  • bthread_join 相当于pthread_join吗

二、bthread的原理

bthread是brpc使用的M:N线程库,M个bthread会映射至N个pthread。

在我们调用一次bthread_start_background / bthread_start_urgent会依次触发TaskControl和TaskGroup的相应接口。bthread主要的类有两个TaskControl和TaskGroup,下面就来探究下TaskControl和TaskGroup如何实现M:N的线程模型。

图1、TaskControl和TaskGroup的调用链

2.1、 bthread进入TaskControl

首先Task Control使用单例模式。

get_or_new_task_control 获取到Task Control实例,并且确保了唯一Task Control的实例,在改函数中该实例严格的生成用atomic保证,并且使用内存memory_order_consume保证了代码顺序不被编译器优化,确保了在多线程环境下的执行顺序。

2.2、 TaskControl

TaskControl采用单例模式,对TaskGroup进行管理。

  • Task Control可以创建多个Task Group,这里用concurrency并发度表示多个Task Gruop。代码中检验是否的TaskControl创建并初始化成功的依据是判butil::atomic<int> _concurrency是一个有效的值。默认个数是9
  • Task Control还会创建一个定时器线程。brpc采用condition_variable的唤醒方式+墙上时钟+小顶堆的方式实现其定时器。
  • Timerthread负责执行定时器唤醒并执行定时任务

那么Task Control管理了什么?以下图示是task control的接口:

接口主要分为两大类:一类是创建并管理TaskGroup,一类是统计TaskGroup相关的数据:

  • 创建管理Task Group,包括add_worker,choose_one_group,stop_and_join,delete_task_group
  • 统计taskgroup的数据包括get_cumulated_worker_time,get_cumulated_switch_count,get_cumulated_signal_count,print_rq_sizes_in_the_tc
  • steal_worker(TaskGroup在自身队列中没有任务情况下,抢占其他task group的内核pthread资源)
  • signal_task,唤醒没有任务在等待的TaskGroup处理任务
  • TaskControl管辖的Pthread之外的Pthread创建的Bthread,由TaskControl 随机选一个TaskGroup进行bthread投递。
图2 TaskControl的接口

2.3、TaskGroup

  • TaskGroup则1:1对应pthread
  • 他是1:N的 bthread调度器。一个TaskGroup是一个pthread,但是可以管理多个归属于这个TaskGroup的所有bthread Task,
  • 一个task_group维护者一个run_queue和一个remote_queue。Remote_queue:用于存放非TaskControl中线程创建的Bthread

备份二级队列, 向队列中提交不在btrhead中创建的任务

2.3.1 入口函数run_main_task()

TaskGroup::run_main_task() 这个是TaskGroup的main入口。

while(Wait_task()) {          //按照顺序先从自身的run_queue,然后去remote_queue找任务执行,或者steal的方式调度任务          TaskGroup::sched_to(&dummy,tid);  }