pthread 多線程基礎
本文主要介紹如何通過 pthread
庫進行多線程編程,並通過以下例子進行說明。
- 基於萊布尼茲級數計算 \(\pi\) .
- 多線程歸併排序
參考文章:
API 介紹
pthread_create
作用:新建一個線程。
函數原型:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
參數解析:
pthread_t *thread
用於緩存新線程的pid
.const pthread_attr_t *attr
制定新線程的attr
,如果為NULL
,那麼將使用默認的attr
。start_routine
是新線程即將進入的執行函數。arg
向新線程傳遞的某些參數,一般封裝為結構體傳入。
線程的中止可以通過以下方式:
- 調用
pthread_exit(void *retval)
, 其中retval
可以通過pthread_join
獲得。 - 在
start_routine
函數直接return
. - 該線程被取消 (See pthread_cancel) .
- 線程所屬的進程調用了
exit
, 或者該進程的main
函數中執行了return
.
pthread_join
等待某個線程結束。
函數原型:
int pthread_join(pthread_t thread, void **retval);
參數解析:
-
thread
是某個線程的pid
. -
retval
用於獲取線程start_routine
的返回值 .
基本用法請看下面的「雙線程計算 \(\pi\)」,該例子同時能夠回答為什麼 retval
是 void**
類型而不是 void *
類型。
pthread_attr_t
pthread_attr_t
的定義如下:
struct __pthread_attr
{
struct sched_param __schedparam;
void *__stackaddr;
size_t __stacksize;
size_t __guardsize;
enum __pthread_detachstate __detachstate;
enum __pthread_inheritsched __inheritsched;
enum __pthread_contentionscope __contentionscope;
int __schedpolicy;
};
與之相關的 API,請看:
man pthread_attr
Examples
創建線程
下面是一個簡單的多線程例子,用於演示 pthread_create
和 pthread_join
的基本用法。
該例子創建 4 個線程,通過 order[i]
分別標號,線程的工作內容是輸出本線程的標號。
所涵蓋的知識點:
- 如何創建線程
- 如何向線程傳遞參數:通過對
void *arg
進行強制類型轉換實現。 pthread_join
的作用:如果去掉pthread_join
調用,那麼程序很可能是沒有輸出的。因為在進入各個線程的worker
函數時,main
函數已經結束,這時候所有線程都被強制終止。
#include <stdio.h>
#include <pthread.h>
const int N = 4;
void* worker(void *arg)
{
int *pid = (int *)arg;
printf("%d ", *pid);
return NULL;
}
int main()
{
pthread_t pid[N] = {0};
const int order[] = {0, 1, 2, 3};
int i = 0;
for (; i < N; i++)
pthread_create(&pid[i], NULL, worker, (void *)&order[i]);
for (i = 0; i < N; i++)
pthread_join(pid[i], NULL);
return 0;
}
雙線程計算 π
要求:
- 基於萊布尼茲級數:1 – 1/3 + 1/5 – 1/7 + 1/9 – … = PI/4
- 使用主線程 + 輔助線程的方式
涵蓋知識點:
- 如何向不同的線程傳遞不同參數
- 如何獲取線程的結果
代碼實現:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
const int N = 1e8;
typedef struct { int start, end; } param_t;
typedef struct { double value; } result_t;
void *worker(void *arg)
{
param_t *param = (param_t *)arg;
result_t *res = (result_t *)malloc(sizeof(result_t));
int i = param->start;
for (; i <= param->end; i++)
{
if (i % 2) res->value += 1.0 / (2 * i - 1);
else res->value -= 1.0 / (2 * i - 1);
}
return res;
}
double master(void *arg)
{
double res = 0.0;
param_t *param = (param_t *)arg;
int i = param->start;
for (; i <= param->end; i++)
{
if (i % 2) res += 1.0 / (2 * i - 1);
else res -= 1.0 / (2 * i - 1);
}
return res;
}
int main()
{
pthread_t tid = 0;
param_t p1 = {1, N / 2}, p2 = {N / 2 + 1, N};
pthread_create(&tid, NULL, worker, &p2);
double val = master(&p1);
result_t *res = NULL;
pthread_join(tid, (void **)&res);
printf("PI = %f\n", 4 * (val + res->value));
free(res);
}
多線程計算 π
要求:
- 適應 N 核心的 CPU
- 不能使用全局變量,必須通過傳遞參數與
join
獲取返回值實現
代碼實現:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
const int N = 1e3;
const int NR_CPU = 8;
typedef struct { int start, end; } param_t;
typedef struct { double value; } result_t;
void *worker(void *arg)
{
param_t *p = (param_t *)arg;
int i = p->start;
result_t *res = malloc(sizeof(result_t));
for (; i < p->end; i++)
{
if (i % 2) res->value += 1.0 / (2 * i - 1);
else res->value -= 1.0 / (2 * i - 1);
}
return res;
}
int main()
{
param_t params[NR_CPU];
pthread_t pids[NR_CPU] = {0};
const int step = N / NR_CPU;
int i = 0;
for (; i < NR_CPU; i++)
{
params[i].start = i * step + 1;
params[i].end = params[i].start + step;
}
params[NR_CPU - 1].end = N;
for (i = 0; i < NR_CPU; i++) pthread_create(&pids[i], NULL, worker, ¶ms[i]);
result_t *res = NULL;
double pi = 0.0;
for (i = 0; i < NR_CPU; i++)
{
pthread_join(pids[i], (void **)&res);
pi += res->value;
if (res) free(res), res = NULL;
}
pi *= 4;
printf("PI = %f\n", pi);
return 0;
}
多線程歸併排序
要求:
- 把數組分為若干個區間,每個區間單獨通過一個線程排序
- 最後所有區間
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
const int N = 1e6;
const int NR_CPU = 4;
typedef struct
{
int *nums;
int start, end;
} param_t;
int check(const int *nums, int len)
{
int i = 1;
for (; i < len; i++)
if (nums[i] < nums[i - 1])
return 0;
return 1;
}
int cmp(const void *a, const void *b) { return (*(int *)a) - (*(int *)b); }
void *worker(void *arg)
{
param_t *p = (param_t *)arg;
int *start = p->nums + p->start;
int n = p->end - p->start;
qsort(start, n, sizeof(int), cmp);
return NULL;
}
// merge [start, mid) and [mid, end)
void merge(const int *nums, int start, int mid, int end)
{
int *p = malloc(sizeof(int) * (end - start));
const int *p1 = nums + start, *p2 = nums + mid;
int len1 = mid - start, len2 = end - mid;
int idx = 0, i = 0, j = 0;
while (i < len1 && j < len2)
{
if (p1[i] < p2[j]) p[idx++] = p1[i++];
else p[idx++] = p2[j++];
}
while (i < len1) p[idx++] = p1[i++];
while (j < len2) p[idx++] = p2[j++];
memcpy((void *)(nums + start), (void *)p, sizeof(int) * idx);
}
int main()
{
srand(time(NULL));
int nums[N] = {0};
int i = 0;
for (; i < N; i++) nums[i] = random() % N;
param_t params[NR_CPU];
int step = N / NR_CPU;
for (i = 0; i < NR_CPU; i++)
{
params[i].nums = nums;
params[i].start = i * step;
params[i].end = params[i].start + step;
}
params[NR_CPU - 1].end = N;
pthread_t pids[NR_CPU] = {0};
for (i = 0; i < NR_CPU; i++) pthread_create(&pids[i], NULL, worker, ¶ms[i]);
for (i = 0; i < NR_CPU; i++) pthread_join(pids[i], NULL);
while (step < N)
{
int start = 0;
while (start < N)
{
int mid = start + step;
int end = mid + step;
if (mid > N) mid = N;
if (end > N) end = N;
merge(nums, start, mid, end);
start = end;
}
step *= 2;
}
assert(check(nums, N));
}