brpc的bthread解读
- 2020 年 4 月 7 日
- 笔记
一、Bthread的简单使用
std::vector<bthread_t> bids; for (int i = 0; i < FLAGS_thread_num; ++i) { if (bthread_start_background(&bids[i], NULL, myfunc, &myarg) != 0) { LOG(ERROR) << "Fail to create bthread"; return -1; } } for (int i = 0; i < FLAGS_thread_num; ++i) { bthread_join(bids[i], NULL); }
- bthread_start_background 相当于pthread_create,此外还有bthread_start_urgent,这个urgent函数用于代码更需要紧急执行的场景,他会把旧的线程置入调度队列里。
- bthread_join 相当于pthread_join吗
二、bthread的原理
bthread是brpc使用的M:N线程库,M个bthread会映射至N个pthread。
在我们调用一次bthread_start_background / bthread_start_urgent会依次触发TaskControl和TaskGroup的相应接口。bthread主要的类有两个TaskControl和TaskGroup,下面就来探究下TaskControl和TaskGroup如何实现M:N的线程模型。

2.1、 bthread进入TaskControl
首先Task Control使用单例模式。
get_or_new_task_control 获取到Task Control实例,并且确保了唯一Task Control的实例,在改函数中该实例严格的生成用atomic保证,并且使用内存memory_order_consume保证了代码顺序不被编译器优化,确保了在多线程环境下的执行顺序。
2.2、 TaskControl
TaskControl采用单例模式,对TaskGroup进行管理。
- Task Control可以创建多个Task Group,这里用concurrency并发度表示多个Task Gruop。代码中检验是否的TaskControl创建并初始化成功的依据是判butil::atomic<int> _concurrency是一个有效的值。默认个数是9
- Task Control还会创建一个定时器线程。brpc采用condition_variable的唤醒方式+墙上时钟+小顶堆的方式实现其定时器。
- Timerthread负责执行定时器唤醒并执行定时任务
那么Task Control管理了什么?以下图示是task control的接口:
接口主要分为两大类:一类是创建并管理TaskGroup,一类是统计TaskGroup相关的数据:
- 创建管理Task Group,包括add_worker,choose_one_group,stop_and_join,delete_task_group
- 统计taskgroup的数据包括get_cumulated_worker_time,get_cumulated_switch_count,get_cumulated_signal_count,print_rq_sizes_in_the_tc
- steal_worker(TaskGroup在自身队列中没有任务情况下,抢占其他task group的内核pthread资源)
- signal_task,唤醒没有任务在等待的TaskGroup处理任务
- TaskControl管辖的Pthread之外的Pthread创建的Bthread,由TaskControl 随机选一个TaskGroup进行bthread投递。

2.3、TaskGroup
- TaskGroup则1:1对应pthread
- 他是1:N的 bthread调度器。一个TaskGroup是一个pthread,但是可以管理多个归属于这个TaskGroup的所有bthread Task,
- 一个task_group维护者一个run_queue和一个remote_queue。Remote_queue:用于存放非TaskControl中线程创建的Bthread
备份二级队列, 向队列中提交不在btrhead中创建的任务
2.3.1 入口函数run_main_task()
TaskGroup::run_main_task() 这个是TaskGroup的main入口。
while(Wait_task()) { //按照顺序先从自身的run_queue,然后去remote_queue找任务执行,或者steal的方式调度任务 TaskGroup::sched_to(&dummy,tid); }