02-Java中的锁详解
I. 使用Lock接口
只要不涉及到复杂用法,一般采用的是Java的synchronized机制
不过,Lock可以提供一些synchronized不支持的机制
- 非阻塞的获取锁:尝试获取锁,如果能获取马上获取,不能获取马上返回,不会阻塞
- 中断获取锁:当获取锁的线程被中断时,抛出异常,锁被释放
- 超时获取锁:为尝试获取锁设定超时时间
相应API:
- void lock():普通的获取锁
- void lockInterruptibly() throws InterruptedException:可中断的获取锁,锁的获取中可以中断线程
- boolean tryLock():非阻塞获取锁
- boolean tryLock(long time, TimeUnit unit):超时获取锁
- void unlock():释放锁
一般框架:
//不要将lock写进try块,防止无故释放
Lock lock = new ReentrantLock();
lock.lock();
try{
...;
}finally{
lock.unlock();
}
II. 队列同步器AQS
AbstractQueuedSynchronizer:队列同步器,简称AQS,用来构建锁或者其他同步组件的基础框架
使用一个int的成员变量表示同步状态,通过内置的FIFO队列完成资源的排队工作
AQS实现锁可以看作:获取同步状态,成功则加锁成功;失败则加锁失败
调用AQS内部的获取同步状态的API,保证是线程安全的
- getState()
- setState(int newState)
- compareAndSetState(int expect, int update)
1. 自己实现一个Mutex互斥锁
首先要继承一个Lock接口,然后自己实现里面的方法
public class Mutex implements Lock {...}
Lock里面的方法是没有默认实现的,因此都需要重写
一般会实现一个继承于AQS的内部类来执行获取同步状态的实现:加锁相当于获取同步状态
public class Mutex implements Lock {
private static class Syn extends AbstractQueuedSynchronizer{...}
}
可以看到,AQS的方法和锁需要实现的方法是对应的
先实现对应的AQS的几个方法
private static class Syn extends AbstractQueuedSynchronizer{
//判断同步器是否被线程占用
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//获取锁
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread()); //设置占用线程
return true;
}
return false;
}
//释放锁
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); //清空占用线程
setState(0);
return true;
}
}
锁的获取和AQS获取同步状态其实是一个道理
通过代理模式可以像下面这样实现
public class Mutex implements Lock {
private static class Syn extends AbstractQueuedSynchronizer{...}
Syn syn = new Syn();
@Override
public void lock() {
syn.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
syn.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return syn.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return syn.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
syn.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
2. AQS实现分析
锁实现的本质:信号量机制,互斥锁也就是0和1两个信号量
AQS维护了一个FIFO的队列,线程获取同步状态失败则会加入这个队列,然后阻塞,直到同步状态释放,队列首节点的线程被唤醒
同步队列中的节点保存的信息有:获取同步状态失败的线程引用,等待状态,前驱和后继节点
同步器有一个头节点和尾节点
加入新的阻塞线程:
构造节点,加入队列的尾节点
使用compareAndSetTail()加到尾部,这是一个原子操作
2.1 独占式的获取和释放
获取同步状态:
acquire()方法会调用tryAcquire(),如果获取失败,则开始调用addWaiter()来给尾节点添加新节点,再调用acquireQueued()等待请求调度
addWaiter()的作用是给FIFO队列添加尾节点,并返回这个节点的引用
因为可能会多个线程申请失败,因此需要使用原子操作compareAndSetTail()
enq()的作用是快速添加失败后的反复尝试,直到添加尾节点成功
acquireQueued()用来请求调度
可见等待调度期间是支持中断的
这个请求调度有两个条件:
- 该节点是首节点
- 申请互斥信号量成功
for循环的这个操作被称为自旋
release()释放互斥信号量,根据上文提到的获取信号量,除了tryRelease(),还应该唤醒后继节点
2.2 共享式状态获取和释放
最典型的场景就是读写场景:一个资源允许多个线程进行读取,此时写线程阻塞;而写线程执行时,所有读线程阻塞
共享锁锁也就是资源信号量的应用,主要解决下面问题:只想要有限的线程执行
调用tryAcquireShared()来申请资源信号量
doAcquireShared()是申请失败后,构造节点加入FIFO队列然后自旋的操作
使用releaseShared()来释放
注意:共享式的释放可能有多个线程,需要用CAS操作来实现tryReleaseShared()
3. 自己实现一个TwinsLock共享锁
需要自己实现的:
-
tryAcquiredShared()
-
tryReleaseShared():要保证释放操作的原子性
State()的取值就是资源信号量的取值
public class TwinsLock {
private int count;
TwinsLock(int count){
this.count = count;
}
private final Sync sync = new Sync(count);
private static final class Sync extends AbstractQueuedSynchronizer{
Sync(int count){
if(count < 0) throw new IllegalArgumentException();
setState(count); //设置资源总数
}
@Override
protected int tryAcquireShared(int arg) {
for(;;){
int current = getState();
int newCount = current - arg;
if(newCount<0 || compareAndSetState(current, newCount)){
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for(;;){
int current = getState();
int newCount = current + arg;
if(compareAndSetState(current, newCount)){
return true;
}
}
}
}
public void lock(){
sync.acquireShared(1);
}
public void unlock(){
sync.releaseShared(1);
}
}
III. 可重入锁
可重入锁:支持一个线程对资源反复加锁
synchronized支持可重入
ReentrantLock
是可重入锁的一种实现,支持反复加锁
锁的公平性:
- 公平:先对锁进行获取的请求先被满足
- 不公平:先对锁进行获取的请求不一定先被满足
1. 实现可重入
只需要判断当前线程是否是获取了锁的线程,如果是,则同步状态加一
每次释放同步状态减一,减到0的时候设置获取锁的线程为null,此时允许其他线程获取
接下来来看看ReetrantLock
的实现
2. 公平锁与非公平锁
继续观察nofairTryAcquire()方法,发现只要CAS成功,则线程直接获取到锁
而公平锁需要确定队列中没有前驱节点,即自己就是首节点
公平锁:确保线程的FIFO,先上下文切换开销大
非公平锁:可能造成线程饥饿,但线程切换少,吞吐量更大
IV. 读写锁
读写锁,是一种提供共享式和独占式两种方式的锁
- 支持公平锁和非公平锁
- 支持重进入
- 支持锁降级
一个资源允许多个线程进行读取,此时写线程阻塞;而写线程执行时,所有读线程阻塞
1. 读写锁的实现
读写锁的同步状态是按位切割使用的
维护了一个int型的同步状态,32位
高16为读状态,低16位为写状态
1.1 写锁的获取
w是c与0x0000FFFF做与运算后的值,w=0有两种情况:
- 有读锁,低16位全0
- 无读锁也无写锁,需要后面的条件判断是否为当前线程
1.2 读锁的获取
和写锁的获取类似,需要判断先有没有写锁
不过读锁是共享式的,可以允许多个线程获取读锁
不过读锁也支持重进入,因此不光要维护获取读锁的总状态,还要维护每个线程获取读锁的状态
2. 锁降级
锁降级指:线程先获取写锁,然后再获取读锁,最后释放写锁,实现从写锁降到读锁
目的:保证读写操作的连贯性
使用场景:写操作执行完马上需要读一次,不加读锁的话可能会被其他写线程修改,再读数据可能就变了
V. LockSupport工具
用于阻塞和唤醒线程
VI. Condition接口
Condition接口依赖于Lock对象,用于实现等待-通知模式
核心API就是两个,这两个API的扩展可以增加超时时间,设置中断不敏感等等:
- await()
- signal()
1. 使用Condition实现一个阻塞队列
队列满的时候,填充操作阻塞;队列空的时候,取出操作阻塞
public class BoundedQueue <T>{
private Object[] items;
private int addIndex, revIndex, count;
private ReentrantLock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private Condition full = lock.newCondition();
public BoundedQueue(int size){
items = new Object[size];
}
/**
* 添加元素
* @param t
*/
public void add(T t) throws InterruptedException {
lock.lock();
try{
while(count == items.length){
System.out.println("已满,请等待消耗");
empty.await();
}
items[addIndex] = t;
if(++addIndex == items.length) addIndex = 0;
count++;
full.signal();
}finally {
lock.unlock();
}
}
/**
* 取出元素
* @return
*/
public T remove() throws InterruptedException {
lock.lock();
try{
while(count == 0){
System.out.println("已空,请等待生产");
full.await();
}
Object temp = items[revIndex];
if(++revIndex == items.length) revIndex = 0;
count--;
empty.signal();
return (T) temp;
}finally {
lock.unlock();
}
}
}
2. Condition的实现分析
每个Condition会维护一个等待队列,一个锁支持支持多个等待队列
获取到锁的线程也就是同步队列的首节点
此时再调用await,则首节点进入等待队列,直到其他线程唤醒
相应的,调用signal则是将等待队列的首节点拆下来放到同步队列,唤醒线程开始自旋
当节点回到同步队列,之前调用的await()中的isOnsyncQueue()会返回true,结束等待,在调用acquireQueued()加入竞争
通过isHeldExclusively判断有没有拿到锁