面试官让你讲讲Linux内核的竞争与并发,你该如何回答?

@

内核中的并发和竞争简介

  在早期的 Linux内核中,并发的来源相对较少。早期内核不支持对称多处理( symmetric multi processing,SMP),因此,导致并发执行的唯一原因是对硬件中断的服务。这种情况处理起来较为简单,但并不适用于为获得更好的性能而使用更多处理器且强调快速响应事件的系统。

  为了响应现代硬件和应用程序的需求, Linux内核已经发展到同时处理更多事情的时代。Linux系统是个多任务操作系统,会存在多个任务同时访问同一片内存区域的情况,这些任务可能会相互覆盖这段内存中的数据,造成内存数据混乱。针对这个问题必须要做处理,严重的话可能会导致系统崩溃。现在的 Linux系统并发产生的原因很复杂,总结一下有下面几个主要原因:

  1. 多线程并发访问, Linux是多任务(线程)的系统,所以多线程访问是最基本的原因。
  2. 抢占式并发访问,内核代码是可抢占的,因此,我们的驱动程序代码可能在任何时候丢失对处理器的独占
  3. 中断程序并发访问,设备中断是异步事件,也会导致代码的并发执行。
  4. SMP(多核)核间并发访问,现在ARM架构的多核SOC很常见,多核CPU存在核间并发访问。正在运行的多个用户空间进程可能以一种令人惊讶的组合方式访问我们的代码,SMP系统甚至可在不同的处理器上同时执行我们的代码。

  只要我们的程序在运转当中,就有可能发生并发和竞争。比如,当两个执行线程需要访问相同的数据结构(或硬件资源)时,混合的可能性就永远存在。因此在设计自己的驱动程序时,就应该避免资源的共享。如果没有并发的访问,也就不会有竞态的产生。因此,仔细编写的内核代码应该具有最少的共享。这种思想的最明显应用就是避免使用全局变量。如果我们将资源放在多个执行线程都会找到的地方(临界区),则必须有足够的理由。

  如何防止我们的数据被并发访问呢?这个时候就要建立一种保护机制,下面介绍几种内核提供的几种并发和竞争的处理方法。

原子操作

原子操作简介

  原子,在早接触到是在化学概念中。原子指化学反应不可再分的基本微粒。同样的,在内核中所说的原子操作表示这一个访问是一个步骤,必须一次性执行完,不能被打断,不能再进行拆分。
  例如,在多线程访问中,我们的线程一对a进行赋值操作,a=1,线程二也对a进行赋值操作a=2,我们理想的执行顺序是线程一先执行,线程二再执行。但是很有可能在线程一执行的时候被其他操作打断,使得线程一最后的执行结果变为a=2。要解决这个问题,必须保证我们的线程一在对数据访问的过程中不能被其他的操作打断,一次性执行完成。

整型原子操作函数

函数 描述
ATOMIC_INIT(int i) 定义原子变量的时候对其初始化。
int atomic_read(atomic_t*v) 读取 v的值,并且返回
void atomic_set(atomic_t *v, int i) 向 v写入 i值。
void atomic_add(int i, atomic_t *v) 给 v加上 i值。
void atomic_sub(int i, atomic_t *v) 从 v减去 i值。
void atomic_inc(atomic_t *v) 给 v加 1,也就是自增。
void atomic_dec(atomic_t *v) 从 v减 1,也就是自减 。
int atomic_dec_return(atomic_t *v) 从 v减 1,并且返回v的值 。
int atomic_inc_return(atomic_t *v) 给 v加 1,并且返回 v的值。
int atomic_sub_and_test(int i, atomic_t *v) 从 v减 i,如果结果为0就返回真,否则就返回假
int atomic_dec_and_test(atomic_t *v) 从 v减 1,如果结果为0就返回真,否则就返回假
int atomic_inc_and_test(atomic_t *v) 给 v加 1,如果结果为0就返回真,否则就返回假
int atomic_add_negative(int i, atomic_t *v) 给 v加 i,如果结果为负就返回真,否则返回假

注:64位的整型原子操作只是将“atomic_”前缀换成“atomic64_”,将int换成long long。

位原子操作函数

函数 描述
void set_bit(int nr, void *p) 将p地址的nr位置1
void clear_bit(int nr,void *p) 将p地址的nr位清零
void change_bit(int nr, void *p) 将p地址的nr位反转
int test_bit(int nr, void *p) 获取p地址的nr位的值
int test_and_set_bit(int nr, void *p) 将p地址的nr位置1,并且返回nr位原来的值
int test_and_clear_bit(int nr, void *p) 将p地址的nr位清0,并且返回nr位原来的值
int test_and_change_bit(int nr, void *p) 将p地址的nr位翻转,并且返回nr位原来的值

原子操作例程

/* 定义原子变量,初值为1*/
static atomic_t xxx_available = ATOMIC_INIT(1); 
static int xxx_open(struct inode *inode, struct file *filp)
{
 ...
 /* 通过判断原子变量的值来检查LED有没有被别的应用使用 */
 if (!atomic_dec_and_test(&xxx_available)) {
 /*小于0的话就加1,使其原子变量等于0*/
 atomic_inc(&xxx_available);
 /* LED被使用,返回忙*/
 return - EBUSY; 
 }
...
/* 成功 */
 return 0;
static int xxx_release(struct inode *inode, struct file *filp)
{
 /* 关闭驱动文件的时候释放原子变量 */
 atomic_inc(&xxx_available); 
 return 0;
}

自旋锁

  上面我们介绍了原子变量,从它的操作函数可以看出,原子操作只能针对整型变量或者位。假如我们有一个结构体变量需要被线程A所访问,在线程A访问期间不能被其他线程访问,这怎么办呢?自旋锁就可以完成对结构体变量的保护。

自旋锁简介

  自旋锁,顾名思义,我们可以把他理解成厕所门上的一把锁。这个厕所门只有一把钥匙,当我们进去时,把钥匙取下来,进去后反锁。那么当第二个人想进来,必须等我们出去后才可以。当第二个人在外面等待时,可能会一直等待在门口转圈。

  我们的自旋锁也是这样,自旋锁只有锁定和解锁两个状态。当我们进入拿上钥匙进入厕所,这就相当于自旋锁锁定的状态,期间谁也不可以进来。当第二个人想要进来,这相当于线程B想要访问这个共享资源,但是目前不能访问,所以线程B就一直在原地等待,一直查询是否可以访问这个共享资源。当我们从厕所出来后,这个时候就“解锁”了,只有再这个时候线程B才能访问。

  假如,在厕所的人待的时间太长怎么办?外面的人一直等待吗?如果换做是我们,肯定不会这样,简直浪费时间,可能我们会寻找其他方法解决问题。自旋锁也是这样的,如果线程A持有自旋锁时间过长,显然会浪费处理器的时间,降低了系统性能。我们知道CPU最伟大的发明就在于多线程操作,这个时候让线程B在这里傻傻的不知道还要等待多久,显然是不合理的。因此,如果自旋锁只适合短期持有,如果遇到需要长时间持有的情况,我们就要换一种方式了(下文的互斥体)。

自旋锁操作函数

函数 描述
DEFINE_SPINLOCK(spinlock_t lock) 定义并初始化一个自旋变量
int spin_lock_init(spinlock_t *lock) 初始化自旋锁
void spin_lock(spinlock_t *lock) 获取指定的自旋锁,也叫加锁
void spin_unlock(spinlock_t *lock) 释放指定的自旋锁。
int spin_trylock(spinlock_t *lock) 尝试获取指定的锁,如果没有获取到,返回0
int spin_is_locked(spinlock_t *lock) 检查指定的自旋锁是否被获取,如果没有被获取返回非0,否则返回0.

  自旋锁是主要为了多处理器系统设计的。对于单处理器且内核不支持抢占的系统,一旦进入了自旋状态,则会永远自旋下去。因为,没有任何线程可以获取CPU来释放这个锁。因此,在单处理器且内核不支持抢占的系统中,自旋锁会被设置为空操作

  以上列表中的函数适用于SMP或支持抢占的单CPU下线程之间的并发访问,也就是用于线程与线程之间,被自旋锁保护的临界区一定不能调用任何能够引起睡眠和阻塞(其实本质仍然是睡眠)的API函数,否则的话会可能会导致死锁现象的发生。自旋锁会自动禁止抢占,也就说当线程A得到锁以后会暂时禁止内核抢占。如果线程A在持有锁期间进入了休眠状态,那么线程A会自动放弃CPU使用权。线程B开始运行,线程B也想要获取锁,但是此时锁被A线程持有,而且内核抢占还被禁止了!线程B无法被调度岀去,那么线程A就无法运行,锁也就无法释放死锁发生了!

  当线程之间发生并发访问时,如果此时中断也要插一脚,中断也想访问共享资源,那该怎么办呢?首先可以肯定的是,中断里面使用自旋锁,但是在中断里面使用自旋锁的时候,在获取锁之前一定要先禁止本地中断(也就是本CPU中断,对于多核SOC来说会有多个CPU核),否则可能导致锁死现象的发生。看下下面一个例子:

//线程A
spin_lock(&lock);
.......
functionA();
.......
spin_unlock(&lock);

//中断发生,运行线程B
spin_lock(&lock);
.......
functionA();
.......
spin_unlock(&lock);

  线程A先运行,并且获取到了lock这个锁,当线程A运行 functionA函数的时候中断发生了,中断抢走了CPU使用权。下边的中断服务函数也要获取lock这个锁,但是这个锁被线程A占有着,中断就会一直自旋,等待锁有效。但是在中断服务函数执行完之前,线程A是不可能执行的,线程A说“你先放手”,中断说“你先放手”,场面就这么僵持着死锁发生!

  使用了自旋锁之后可以保证临界区不受别的CPU和本CPU内的抢占进程的打扰,但是得到锁的代码在执行临界区的时候,还可能受到中断和底半部的影响,为了防止这种影响,建议使用以下列表中的函数:

函数 描述
void spin_lock_irq(spinlock_t *lock) 禁止本地中断,并获取自旋锁
void spin_unlock_irq(spinlock_t *lock) 激活本地中断,并释放自旋锁
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags) 保存中断状态,禁止本地中断,并获取自旋锁
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)       将中断状态恢复到以前的状态,并且激活本地中断,释放自旋锁

  在多核编程的时候,如果进程和中断可能访问同一片临界资源,我们一般需要在进程上下文中调用spin_ lock irqsave() spin_unlock_irqrestore(),在中断上下文中调用 spin_lock() spin _unlock()。这样,在CPU上,无论是进程上下文,还是中断上下文获得了自旋锁,此后,如果CPU1无论是进程上下文,还是中断上下文,想获得同一自旋锁,都必须忙等待,这避免一切核间并发的可能性。同时,由于每个核的进程上下文持有锁的时候用的是 spin_lock_irgsave(),所以该核上的中断是不可能进入的,这避免了核内并发的可能性。

DEFINE_SPINLOCK(lock) /* 定义并初始化一个锁 */ 
/* 线程A */
void functionA (){ 
unsigned long flags; /* 中断状态 */
 spin_lock_irqsave(&lock, flags) /* 获取锁 */ 
  /* 临界区 */ 
spin_unlock_irqrestore(&lock, flags) /* 释放锁 */ 
} 
 /* 中断服务函数 */
 void irq() { 
 spin_lock(&lock) /* 获取锁 */ 
   /* 临界区 */ 
 spin_unlock(&lock) /* 释放锁 */ 
}

自旋锁例程

static int xxx_open(struct inode *inode, struct file *filp)
{
...
	spinlock(&xxx_lock);
	if (xxx_count) {/* 已经打开*/
	spin_unlock(&xxx_lock);
	return -EBUSY;
 }
	 xxx_count++;/* 增加使用计数*/
 	spin_unlock(&xxx_lock);
 ...
	 return 0;/* 成功 */
}

static int xxx_release(struct inode *inode, struct file *filp)
{
	 ...
	 spinlock(&xxx_lock);
	 xxx_count--;/* 减少使用计数*/
	 spin_unlock(&xxx_lock);
 	return 0;
}

读写自旋锁

  当临界区的一个文件可以被同时读取,但是并不能被同时读和写。如果一个线程在读,另一个线程在写,那么很可能会读取到错误的不完整的数据。读写自旋锁是可以允许对临界区的共享资源进行并发读操作的。但是并不允许多个线程并发读写操作。如果想要并发读写,就要用到了顺序锁。
  读写自旋锁的读操作函数如下所示:

函数 描述
DEFINE_RWLOCK(rwlock_t lock) 定义并初始化读写锁
void rwlock_init(rwlock_t *lock) 初始化读写锁
void read_lock(rwlock_t *lock) 获取读锁
void read_unlock(rwlock_t *lock 释放读锁
void read_unlock_irq(rwlock_t *lock) 打开本地中断,并且释放读锁
void read_lock_irqsave(rwlock_t *lock,unsigned long flags) 保存中断状态,禁止本地中断,并获取读锁
void read_unlock_irqrestore(rwlock_t *lock,unsigned long flags) 将中断状态恢复到以前的状态,并且激活本地中断,释放读锁
void read_lock_bh(rwlock_t *lock) 关闭下半部,并获取读锁
void read_unlock_bh(rwlock_t *lock) 打开下半部,并释放读锁

  读写自旋锁的写操作函数如下所示:

函数 描述
void write_lock(rwlock_t *lock) 获取写锁
void write_unlock(rwlock_t *lock) 释放写锁
void write_lock_irq(rwlock_t *lock) 禁止本地中断,并且获取写锁。
void write_unlock_irq(rwlock_t *lock) 打开本地中断,并且释放写锁
void write_lock_irqsave(rwlock_t *lock,unsigned long flags) 保存中断状态,禁止本地中断,并获取写锁
void write_unlock_irqrestore(rwlock_t *lock,unsigned long flags) 将中断状态恢复到以前的状态,并且激活本地中断,释放写锁
void write_lock_bh(rwlock_t *lock) 关闭下半部,并获取写锁
void write_unlock_bh(rwlock_t *lock) 打开下半部,并释放写锁

读写锁例程

rwlock_t lock; /* 定义rwlock */
rwlock_init(&lock); /* 初始化rwlock */
/* 读时获取锁*/
read_lock(&lock);
... /* 临界资源 */
read_unlock(&lock);
/* 写时获取锁*/
write_lock_irqsave(&lock, flags);
... /* 临界资源 */
write_unlock_irqrestore(&lock, flags);

顺序锁

  顺序锁是读写锁的优化版本,读写锁不允许同时读写,而使用顺序锁可以完成同时进行读和写的操作但并不允许同时的写。虽然顺序锁可以同时进行读写操作,但并不建议这样,读取的过程并不能保证数据的完整性。

顺序锁操作函数

  顺序锁的读操作函数如下所示:

函数 描述
DEFINE_SEQLOCK(seqlock_t sl) 定义并初始化顺序锁
void seqlock_ini seqlock_t *sl) 初始化顺序锁
void write_seqlock(seqlock_t *sl) 顺序锁写操作
void write_sequnlock(seqlock_t *sl) 获取写顺序锁
void write_seqlock_irq(seqlock_t *sl) 禁止本地中断,并且获取写顺序锁
void write_sequnlock_irq(seqlock_t *sl) 打开本地中断,并且释放写顺序锁
void write_seqlock_irqsave(seqlock_t *sl,unsigned long flags) 保存中断状态,禁止本地中断,并获取写顺序
void write_sequnlock_irqrestore(seqlock_t *sl,unsigned long flags) 将中断状态恢复到以前的状态,并且激活本地中断,释放写顺序锁
void write_seqlock_bh(seqlock_t *sl) 关闭下半部,并获取写读锁
void write_sequnlock_bh(seqlock_t *sl) 打开下半部,并释放写读锁

  顺序锁的写操作函数如下所示:

函数 描述
DEFINE_RWLOCK(rwlock_t lock) 读单元访问共享资源的时候调用此函数,此函数会返回顺序锁的顺序号
unsigned read_seqretry(const seqlock_t *sl,unsigned start) 读结束以后调用此函数检查在读的过程中有没有对资源进行写操作,如果有的话就要重读

自旋锁使用注意事项

  1. 因为在等待自旋锁的时候处于“自旋”状态,因此锁的持有时间不能太长,一定要短,否则的话会降低系统性能。如果临界区比较大,运行时间比较长的话要选择其他的并发处理方式,比如稍后要讲的信号量和互斥体。
  2. 自旋锁保护的临界区内不能调用任何可能导致线程休眠的API函数,比如copy_from_user()、copy_to_user()、kmalloc()和msleep()等函数,否则的话可能导致死锁。
  3. 不能递归申请自旋锁,因为一旦通过递归的方式申请一个你正在持有的锁,那么你就必须“自旋”,等待锁被释放,然而你正处于“自旋”状态,根本没法释放锁。结果就是自己把自己锁死了
  4. 在编写驱动程序的时候我们必须考虑到驱动的可移植性,因此不管你用的是单核的还是多核的SOC,都将其当做多核SOC来编写驱动程序。

copy_from_user的使用是结合进程上下文的,因为他们要访问“user”的内存空间,这个“user”必须是某个特定的进程。如果在驱动中使用这两个函数,必须是在实现系统调用的函数中使用,不可在实现中断处理的函数中使用。如果在中断上下文中使用了,那代码就很可能操作了根本不相关的进程地址空间。其次由于操作的页面可能被换出,这两个函数可能会休眠,所以同样不可在中断上下文中使用。

信号量

信号量简介

  信号量和自旋锁有些相似,不同的是信号量会发出一个信号告诉你还需要等多久。因此,不会出现傻傻等待的情况。比如,有100个停车位的停车场,门口电子显示屏上实时更新的停车数量就是一个信号量。这个停车的数量就是一个信号量,他告诉我们是否可以停车进去。当有车开进去,信号量加一,当有车开出来,信号量减一。
  比如,厕所一次只能让一个人进去,当A在里面的时候,B想进去,如果是自旋锁,那么B就会一直在门口傻傻等待。如果是信号量,A就会给B一个信号,你先回去吧,我出来了叫你。这就是一个信号量的例子,B听到A发出的信号后,可以先回去睡觉,等待A出来。
  因此,信号量显然可以提高系统的执行效率,避免了许多无用功。信号量具有以下特点:

  1. 因为信号量可以使等待资源线程进入休眠状态,因此适用于那些占用资源比较久的场合。
  2. 因此信号量不能用于中断中,因为信号量会引起休眠,中断不能休眠
  3. 如果共享资源的持有时间比较短,那就不适合使用信号量了,因为频繁的休眠、切换线程引起的开销要远大于信号量带来的那点优势

信号量操作函数

函数 描述
DEFINE_SEAMPHORE(name) 定义一个信号量,并且设置信号量的值为1
void sema_init(struct semaphore *sem, int val) 初始化信号量sem,设置信号量值为val
void down(struct semaphore *sem) 获取信号量,因为会导致休眠,因此不能在中断中使用
int down_trylock(struct semaphore *sem); 尝试获取信号量,如果能获取到信号量就获取,并且返回0.如果不能就返回非0,并且不会进入休眠
int down_interruptible(struct semaphore                       获取信号量,和down类似,只是使用dow进入休眠状态的线程不能被信号打断。而使用此函数进入休眠以后是可以被信号打断的
void up(struct semaphore *sem) 释放信号量

信号量例程

struct semaphore sem; /* 定义信号量 */ 
sema_init(&sem, 1); /* 初始化信号量 表示只能由一个线程同时访问这块资源 */
 down(&sem); /* 申请信号量 */
  /* 临界区 */ 
 up(&sem); /* 释放信号量 */

互斥体

互斥体简介

  互斥体表示一次只有一个线程访问共享资源,不可以递归申请互斥体
  信号量也可以用于互斥体,当信号量用于互斥时(即避免多个进程同时在一个临界区中运行),信号量的值应初始化为1.这种信号量在任何给定时刻只能由单个进程或线程拥有。在这种使用模式下,一个信号量有时也称为一个“互斥体( mutex)”,它是互斥( mutual exclusion)的简称。Linux内核中几平所有的信号量均用于互斥

互斥体操作函数

函数 描述
DEFINE_MUTEX(name) 定义并初始化一个 mutex变量
void mutex_init(mutex *lock) 初始化 mutex
void mutex_lock(struct mutex *lock) 获取 mutex,也就是给 mutex上锁。如果获取不到就进休眠
void mutex_unlock(struct mutex *lock) 释放 mutex,也就给 mutex解锁
int mutex_trylock(struct mutex *lock)                       判断 mutex是否被获取,如果是的话就返回,否则返回0
int mutex_lock_interruptible(struct mutex *lock) 使用此函数获取信号量失败进入休眠以后可以被信号打断

互斥体例程

struct mutex lock; /* 定义一个互斥体 */ 
mutex_init(&lock); /* 初始化互斥体 */ 
mutex_lock(&lock); /* 上锁 */ 
/* 临界区 */
mutex_unlock(&lock); /* 解锁*/

互斥体与自旋锁

  互斥体和自旋锁都是解决互斥问题的一种手段。互斥体是进程级别的,互斥体在使用的时候会发生进程间的切换,因此,使用互斥体资源开销比较大。自旋锁可以节省上下文切换的时间,如果持有锁的时间不长,使用自旋锁是比较好的选择,如果持有锁时间比较长,互斥体显然是更好的选择。

互斥体使用注意事项

  1. 当锁不能被获取到时,使用互斥体的开销是进程上下文切换时间,使用自旋锁的开销是等待获取自旋锁(由临界区执行时间决定)。若临界区比较小,宜使用自旋锁,若临界区很大,应使用互斥体。
  2. 互斥体所保护的临界区可包含可能引起阻塞的代码,而自旋锁则绝对要避免用来保护包含这样代码的临界区。因为阻塞意味着要进行进程的切换,如果进程被切换岀去后,另一个进程企图获取本自旋锁,死锁就会发生。
  3. 互斥体存在于进程上下文。因此,如果被保护的共享资源需要在中断或软中断情况下使用,则在互斥体和自旋锁之间只能选择自旋锁。当然,如果一定要使用互斥体,则只能通过mutex trylock()方式进行,不能获取就立即返回以避免阻塞。

  大家的鼓励是我继续创作的动力,如果觉得写的不错,欢迎关注,点赞,收藏,转发,谢谢!

有任何问题,均可通过公告中的二维码联系我