BPF CO-RE 示例程式碼解析
BPF CO-RE 示例程式碼解析
在BPF的可移植性和CO-RE一文的末尾提到了一個名為runqslower的工具,該工具用於展示在CPU run隊列中停留的時間大於某一值的任務。現在以該工具來展示如何使用BPF CO-RE。
環境
本地測試的話,建議採用Ubuntu,其內核本身已經開啟了BTF選項,無需再對內核進行編譯。我用的是Ubuntu 20.10
,內核版本5.8.0
。
# cat /boot//config-$(uname -r)|grep BTF
CONFIG_VIDEO_SONY_BTF_MPX=m
CONFIG_DEBUG_INFO_BTF=y
編譯
僅需要在runqslower目錄下執行make即可。如果用的是自己生成的vmlinux,則需要在Makefile中增加對VMLINUX_BTF
的定義,值為本地編譯的vmlinux
的路徑,如:
VMLINUX_BTF := /root/linux-5.10.5/vmlinux
在BCC和libbpf的轉換一文中可以了解到,BPF CO-RE的基本步驟如下,:
- 生成包含所有內核類型的頭文件
vmlinux.h
; - 使用Clang(版本10或更新版本)將BPF程式的源程式碼編譯為
.o
對象文件; - 從編譯好的BPF對象文件中生成BPF skeleton 頭文件(對應runqslower的BPF對象文件為
runqslower.bpf.o
,也可以通過bpftool gen skeleton runqslower.bpf.o
生成skeleton頭文件) ; - 在用戶空間程式碼中包含生成的BPF skeleton 頭文件(BPF skeleton 頭文件是給用戶空間使用的);
- 最後,編譯用戶空間程式碼,這樣會嵌入BPF對象程式碼,後續就不用發布單獨的文件。
其中第1、3步分別使用bpftool btf dump file
和bpftool gen skeleton
來生成vmliunx.h
和skeleton 頭文件。具體使用方式可以參見runqslower
的Makefile文件。
運行
直接看下最終的效果,運行如下,可以看到該BPF應用其實就是一個普通的ELF可執行文件(無需獨立發布BPF程式和用戶側程式),大小僅為1M左右,如果要在另一台機器運行,直接拷貝過去即可(前提是目標內核開啟了CONFIG_DEBUG_INFO_BTF
選項)。
# ./runqslower 200
Tracing run queue latency higher than 200 us
TIME COMM PID LAT(us)
16:45:16 kworker/u256:1 6007 209
16:45:16 kworker/1:2 6045 1222
16:45:16 sshd 6045 331
16:45:16 swapper/0 6045 2120
使用bpftool prog -p
可以查看安裝的bpf程式:
{
"id": 157,
"type": "tracing",
"name": "handle__sched_w",
"tag": "4eadb7a05d79f434",
"gpl_compatible": true,
"loaded_at": 1611822519,
"uid": 0,
"bytes_xlated": 176,
"jited": true,
"bytes_jited": 121,
"bytes_memlock": 4096,
"map_ids": [71,69
],
"btf_id": 65,
"pids": [{
"pid": 6012,
"comm": "runqslower"
}
]
},{
"id": 158,
"type": "tracing",
"name": "handle__sched_s",
"tag": "36ab461bac5b3a97",
"gpl_compatible": true,
"loaded_at": 1611822519,
"uid": 0,
"bytes_xlated": 584,
"jited": true,
"bytes_jited": 354,
"bytes_memlock": 4096,
"map_ids": [71,69,70
],
"btf_id": 65,
"pids": [{
"pid": 6012,
"comm": "runqslower"
}
]
}
程式碼解析
按照上述編譯中設計的順序,首選應該編寫BFP層的程式碼,然後再編寫用戶空間的程式碼。BPF CO-RE的處理邏輯基本與BCC保持一致。當觸發相關事件時會運行內核空間程式碼,然後在用戶空間接收內核程式碼傳遞的資訊。
下面以程式碼注釋的方式解析BPF CO-RE的一些使用規範,最後會做一個總結。
內核空間(BPF)程式碼
內核空間程式碼通常包含如下頭文件:
#include "vmlinux.h" /* all kernel types */
#include <bpf/bpf_helpers.h> /* most used helpers: SEC, __always_inline, etc */
#include <bpf/bpf_core_read.h> /* for BPF CO-RE helpers */
內核空間的BPF程式碼如下(假設生成的.o文件名為runqslower.bpf.o
):
// SPDX-License-Identifier: GPL-2.0
// Copyright (c) 2019 Facebook
/* BPF程式包含的頭文件,可以看到內容想相當簡潔 */
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include "runqslower.h"
#define TASK_RUNNING 0
#define BPF_F_CURRENT_CPU 0xffffffffULL
/* 在BPF程式碼側,可以使用一個 const volatile 聲明只讀的全局變數,只讀的全局變數,變數最後會存在於runqslower.bpf.o的.rodata只讀段,用戶側可以在BPF程式載入前讀取或修改該只讀段的參數【1】 */
const volatile __u64 min_us = 0;
const volatile pid_t targ_pid = 0;
/* 定義名為 start 的map,類型為 BPF_MAP_TYPE_HASH。容量為10240,key類型為u32,value類型為u64。可以在【1】中查看BPF程式解析出來的.maps段【2】 */
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10240);
__type(key, u32);
__type(value, u64);
} start SEC(".maps");
/* 由於 PERF_EVENT_ARRAY, STACK_TRACE 和其他特殊的maps(DEVMAP, CPUMAP, etc) 尚不支援key/value類型的BTF類型,因此需要直接指定 key_size/value_size */
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
} events SEC(".maps");
/* record enqueue timestamp */
/* 自定義的輔助函數必須標記為 static __always_inline。該函數用於保存喚醒的任務事件,key為pid,value為喚醒的時間點 */
__always_inline
static int trace_enqueue(u32 tgid, u32 pid)
{
u64 ts;
if (!pid || (targ_pid && targ_pid != pid))
return 0;
ts = bpf_ktime_get_ns();
bpf_map_update_elem(&start, &pid, &ts, 0);
return 0;
}
/* 所有BPF程式提供的功能都需要通過 SEC() (來自 bpf_helpers.h )宏來自定義section名稱【3】。可以在【1】中查看BPF程式解析出來的自定義函數 */
/* 喚醒一個任務,並保存當前時間 */
SEC("tp_btf/sched_wakeup")
int handle__sched_wakeup(u64 *ctx)
{
/* TP_PROTO(struct task_struct *p) */
struct task_struct *p = (void *)ctx[0];
return trace_enqueue(p->tgid, p->pid);
}
/* 喚醒一個新創建的任務,並保存當前時間。BPF的上下文為一個task_struct*結構體 */
SEC("tp_btf/sched_wakeup_new")
int handle__sched_wakeup_new(u64 *ctx)
{
/* TP_PROTO(struct task_struct *p) */
struct task_struct *p = (void *)ctx[0];
return trace_enqueue(p->tgid, p->pid);
}
/* 計算一個任務入run隊列到出隊列的時間 */
SEC("tp_btf/sched_switch")
int handle__sched_switch(u64 *ctx)
{
/* TP_PROTO(bool preempt, struct task_struct *prev,
* struct task_struct *next)
*/
struct task_struct *prev = (struct task_struct *)ctx[1];
struct task_struct *next = (struct task_struct *)ctx[2];
struct event event = {};
u64 *tsp, delta_us;
long state;
u32 pid;
/* ivcsw: treat like an enqueue event and store timestamp */
/* 如果被切換的任務的狀態仍然是TASK_RUNNING,說明其又重新進入run隊列,更新入隊列的時間 */
if (prev->state == TASK_RUNNING)
trace_enqueue(prev->tgid, prev->pid);
/* 獲取下一個任務的PID */
pid = next->pid;
/* fetch timestamp and calculate delta */
/* 如果該任務並沒有被喚醒,則無法正常進行任務切換,返回0即可 */
tsp = bpf_map_lookup_elem(&start, &pid);
if (!tsp)
return 0; /* missed enqueue */
/* 當前切換時間減去該任務的入隊列時間,計算進入run隊列到真正調度的毫秒級時間 */
delta_us = (bpf_ktime_get_ns() - *tsp) / 1000;
if (min_us && delta_us <= min_us)
return 0;
/* 更新events section,以便用戶側讀取 */
event.pid = pid;
event.delta_us = delta_us;
bpf_get_current_comm(&event.task, sizeof(event.task));
/* output */
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU,
&event, sizeof(event));
/* 該任務已經出隊列,刪除map */
bpf_map_delete_elem(&start, &pid);
return 0;
}
char LICENSE[] SEC("license") = "GPL";
【1】:
-
用戶空間可以且只能通過
BPF skeletob
方式來訪問和更新全局變數,更新後的變數會立即反應到BPF側。需要注意的是,全局變數只是BPF側的變數,用戶空間其實是通過.rodata
間接來操作這類變數,意味著如果用戶側也定義了一個相同的變數,則會被視為兩個獨立的變數。 -
用戶空間操作全局變數的一般操作如下:
struct <name> *skel = <name>__open(); if (!skel) /* handle errors */ skel->rodata->my_cfg.feature_enabled = true; skel->rodata->my_cfg.pid_to_filter = 123; if (<name>__load(skel)) /* handle errors */
-
從下面解析的ELF文件的內容可以看到,使用
const volatile
聲明的全局變數min_us
和targ_pid
位於.rodata
(read-only)段,用戶空間的應用可以在載入BPF程式前讀取或更新BPF側的全局變數,runqslower
通過這種方式設置了min_us
的值。# llvm-objdump -t runqslower.bpf.o runqslower.bpf.o: file format elf64-bpf SYMBOL TABLE: 0000000000000050 l .text 0000000000000000 LBB0_3 00000000000000a0 l .text 0000000000000000 LBB0_4 0000000000000100 l .text 0000000000000000 LBB1_3 0000000000000150 l .text 0000000000000000 LBB1_4 00000000000001f8 l .text 0000000000000000 LBB2_4 0000000000000248 l .text 0000000000000000 LBB2_5 00000000000002e0 l .text 0000000000000000 LBB2_8 0000000000000388 l .text 0000000000000000 LBB2_9 0000000000000000 l d .text 0000000000000000 .text 0000000000000000 g O license 0000000000000004 LICENSE 0000000000000020 g O .maps 0000000000000018 events #名為 events 的 maps 0000000000000160 g F .text 0000000000000238 handle__sched_switch #handle__sched_switch 程式碼段 0000000000000000 g F .text 00000000000000b0 handle__sched_wakeup #handle__sched_wakeup 程式碼段 00000000000000b0 g F .text 00000000000000b0 handle__sched_wakeup_new #handle__sched_wakeup_new 程式碼段 0000000000000000 g O .rodata 0000000000000008 min_us #全局變數 min_us 0000000000000000 g O .maps 0000000000000020 start #名為 start 的 maps 0000000000000008 g O .rodata 0000000000000004 targ_pid #全局變數 targ_pid
skel->rodata
用於只讀變數;skel->bss
用於初始值為0的可變數;skel->data
用於初始值非0的可變數。
【2】:
-
通常一個map具有如下屬性:
- 類型
- 最大元素數目
- key的位元組大小
- value的位元組大小
可以使用如下介面對maps進行操作:
bpf_map_operation_elem(&some_map, some, args);
一般常見的介面如下,可以在內核/用戶空間對maps中的元素進行增刪改查操作:
bpf_map_lookup_elem bpf_map_update_elem bpf_map_delete_elem bpf_map_push_elem bpf_map_pop_elem bpf_map_peek_elem
【3】:
-
約定的SEC的命名方式如下,libbpf可以根據SEC欄位自動檢測BPF程式類型,然後關聯特定的BPF程式類型,不同的程式類型決定了BPF程式的第一個入參關聯的上下文。使用bpftool feature可以查看支援不同程式類型的BPF輔助函數。更多參見section_defs。
tp/<category>/<name>
用於Tracepoints;kprobe/<func_name>
用於kprobe ,kretprobe/<func_name>
用於kretprobe;raw_tp/<name>
用於原始Tracepoint;cgroup_skb/ingress
,cgroup_skb/egress
,以及整個cgroup/<subtype>
程式族。
-
tp_btf/sched_wakeup
、tp_btf/sched_wakeup_new
、tp_btf/sched_switch
跟蹤了系統任務上下文切換相關的事件,可以在/sys/kernel/debug/tracing/events/sched
下找到對應的事件定義。 -
像
int handle__sched_wakeup(u64 *ctx)
這樣的用法仍然屬於BCC的使用方式,BPF支援使用BPF_KPROBE
/BPF_KRETPROBE
來像內核函數一樣給BPF程式傳參,主要用於tp_btf
/fentry
/fexit
BPF程式。用法如下(更多方式,參見這裡):SEC("kprobe/xfs_file_open") int BPF_KPROBE(xfs_file_open, struct inode *inode, struct file *file) { ....... }
使用
BPF_KPROBE
時需要保證,第一個參數必須是一個系統調用,由於tp_btf/sched_wakeup
、tp_btf/sched_wakeup_new
、tp_btf/sched_switch
並不是系統調用,而是跟蹤事件,因此不能使用BPF_KPROBE
。
用戶空間程式碼
用戶側程式碼通常包含如下頭文件:
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
#include "path/to/your/skeleton.skel.h"
用戶側的主要程式碼如下:
int libbpf_print_fn(enum libbpf_print_level level,
const char *format, va_list args)
{
if (level == LIBBPF_DEBUG && !env.verbose)
return 0;
return vfprintf(stderr, format, args);
}
static int bump_memlock_rlimit(void)
{
struct rlimit rlim_new = {
.rlim_cur = RLIM_INFINITY,
.rlim_max = RLIM_INFINITY,
};
return setrlimit(RLIMIT_MEMLOCK, &rlim_new);
}
void handle_event(void *ctx, int cpu, void *data, __u32 data_sz)
{
const struct event *e = data;
struct tm *tm;
char ts[32];
time_t t;
time(&t);
tm = localtime(&t);
strftime(ts, sizeof(ts), "%H:%M:%S", tm);
printf("%-8s %-16s %-6d %14llu\n", ts, e->task, e->pid, e->delta_us);
}
void handle_lost_events(void *ctx, int cpu, __u64 lost_cnt)
{
printf("Lost %llu events on CPU #%d!\n", lost_cnt, cpu);
}
int main(int argc, char **argv)
{
static const struct argp argp = {
.options = opts,
.parser = parse_arg,
.doc = argp_program_doc,
};
struct perf_buffer_opts pb_opts;
struct perf_buffer *pb = NULL;
struct runqslower_bpf *obj;
int err;
err = argp_parse(&argp, argc, argv, 0, NULL, NULL);
if (err)
return err;
/* 設置libbpf的日誌列印 */
libbpf_set_print(libbpf_print_fn);
/* BPF的BPF maps以及其他內容使用了locked類型的記憶體, libbpf不會自動設置該值,因此必須手動指定 */
err = bump_memlock_rlimit();
if (err) {
fprintf(stderr, "failed to increase rlimit: %d", err);
return 1;
}
/* 獲取BPF對象,程式被編碼到了bpf_object_skeleton.data中【1】 */
obj = runqslower_bpf__open();
if (!obj) {
fprintf(stderr, "failed to open and/or load BPF object\n");
return 1;
}
/* initialize global data (filtering options) */
/* 通過.rodata段修改全局變數,注意此時並沒有載入BPF程式 */
obj->rodata->targ_pid = env.pid;
obj->rodata->min_us = env.min_us;
/* 將BPF程式(使用mmap方式)載入到記憶體中 */
err = runqslower_bpf__load(obj);
if (err) {
fprintf(stderr, "failed to load BPF object: %d\n", err);
goto cleanup;
}
/* 附加BPF程式,此時runqslower_bpf.links生效【2】 */
err = runqslower_bpf__attach(obj);
if (err) {
fprintf(stderr, "failed to attach BPF programs\n");
goto cleanup;
}
printf("Tracing run queue latency higher than %llu us\n", env.min_us);
printf("%-8s %-16s %-6s %14s\n", "TIME", "COMM", "PID", "LAT(us)");
pb_opts.sample_cb = handle_event;
pb_opts.lost_cb = handle_lost_events;
pb = perf_buffer__new(bpf_map__fd(obj->maps.events), 64, &pb_opts);
err = libbpf_get_error(pb);
if (err) {
pb = NULL;
fprintf(stderr, "failed to open perf buffer: %d\n", err);
goto cleanup;
}
/* 輪詢event事件,並通過掛載的perf鉤子列印輸出 */
while ((err = perf_buffer__poll(pb, 100)) >= 0)
;
printf("Error polling perf buffer: %d\n", err);
cleanup:
perf_buffer__free(pb);
runqslower_bpf__destroy(obj);
return err != 0;
}
【1】
-
用戶空間需要接收內核空間傳遞過來的資訊,使用生成的skeleton頭文件的如下函數操作內核程式:
<name>__open()
– 創建並打開 BPF 應用(例如的runqslower的runqslower_bpf__open()
函數);<name>__load()
– 初始化,載入和校驗BPF 應用部分;<name>__attach()
– 附加所有可附加的BPF程式 (可選,可以直接使用libbpf API作更多控制);<name>__destroy()
– 分離BPF 程式並使用其使用的所有資源。
-
obj = runqslower_bpf__open();
,其中obj
的結構體位於runqslower.skel.h
,是根據BPF程式自動生成的,內容如下:struct runqslower_bpf { struct bpf_object_skeleton *skeleton; struct bpf_object *obj; struct { struct bpf_map *start; struct bpf_map *events; struct bpf_map *rodata; } maps; /* 對應BPF程式中定義的兩個.maps以及一個全局只讀section .rodata */ struct { struct bpf_program *handle__sched_wakeup; struct bpf_program *handle__sched_wakeup_new; struct bpf_program *handle__sched_switch; } progs; /* 對應BPF程式使用SEC()定義的3個BPF程式 */ struct { struct bpf_link *handle__sched_wakeup; struct bpf_link *handle__sched_wakeup_new; struct bpf_link *handle__sched_switch; } links; /* 鏈接到BPF程式的link,可以使用bpftool link命令查看,可以顯示鏈接的BPF程式,進程等資訊 */ struct runqslower_bpf__rodata { __u64 min_us; pid_t targ_pid; } *rodata; /* 對應BPF程式的.rodata section */ };
-
其實整個處理過程簡單歸結為:創建
runqslower_bpf.skeleton
對象,賦值runqslow的資訊(maps,progs,links,rodata),其中skeleton->data
編碼了BPF程式,後續會被解析為Efile對象;然後載入BPF程式,進行初始化和校驗;然後attach之後,BPF程式開始正式運行。
【2】
-
Skeleton 可以用於大部分場景,但有一個例外:perf events。這種情況下,不能使用
struct <name>__bpf
中的links,而應該自定義一個struct bpf_link *links[]
,原因是perf_event
需要在每個CPU上進行操作。例如llcstat.cstatic int open_and_attach_perf_event(__u64 config, int period, struct bpf_program *prog, struct bpf_link *links[]) { struct perf_event_attr attr = { .type = PERF_TYPE_HARDWARE, .freq = 0, .sample_period = period, .config = config, }; int i, fd; for (i = 0; i < nr_cpus; i++) { fd = syscall(__NR_perf_event_open, &attr, -1, i, -1, 0); if (fd < 0) { fprintf(stderr, "failed to init perf sampling: %s\n", strerror(errno)); return -1; } links[i] = bpf_program__attach_perf_event(prog, fd); if (libbpf_get_error(links[i])) { fprintf(stderr, "failed to attach perf event on cpu: " "%d\n", i); links[i] = NULL; close(fd); return -1; } } return 0; }
TIPs
-
非內核5.3以上的版本中的循環都必須添加
#pragma unroll
標誌#pragma unroll for (i = 0; i < 10; i++) { ... }
-
bpf_printk 調試,僅適用於非生產環境
char comm[16]; u64 ts = bpf_ktime_get_ns(); u32 pid = bpf_get_current_pid_tgid(); bpf_get_current_comm(&comm, sizeof(comm)); bpf_printk("ts: %lu, comm: %s, pid: %d\n", ts, comm, pid);
-
BPF涉及到的主要頭文件有:
libbpf.h
: 定義了通用的ebpf ELF對象的載入操作libbpf/include/uapi/linux/bpf.h
: 定義了BPF的各種類型(prog_type,map_type,attach_type以及設計的結構體定義等)libbpf/src/bpf.h
: 定義了通用的eBPF ELF操作bpf_core_read.h
: 定義了讀取內核結構的方法bpf_helpers.h
: 定義了BPF程式用到的宏SEC()
總結
- 首先編寫BPF程式,定義BPF的maps和sections;
- 編譯BPF程式,然後根據編譯出來的.o文件生成對應的skeleton頭文件
- 用戶空間的程式包含skeleton頭文件,可以通過
const volatile
定義的全局變數(在載入BPF程式前)給BPF程式傳遞參數。需要注意的是,全局變數在BPF程式載入後是不可變的,如果要在載入之後給BPF程式傳遞數據,可以使用map(全局變數就是為了節省在給BPF程式傳遞常量的情況下存在的,節省查找map的開銷); - 用戶空間執行
open
->load
->attach
->destroy
來控制BPF程式的生命周期。
下一篇將使用BPF CO-RE方式重寫一個XDP程式。
參考
- bpf(2)
- bpf-helpers(7)
- BCC和libbpf的轉換
- Tips and Tricks for Writing Linux BPF Applications with libbpf
- context for each type of ebpf program