02-Java中的锁详解

I. 使用Lock接口

只要不涉及到复杂用法,一般采用的是Java的synchronized机制

不过,Lock可以提供一些synchronized不支持的机制

  • 非阻塞的获取锁:尝试获取锁,如果能获取马上获取,不能获取马上返回,不会阻塞
  • 中断获取锁:当获取锁的线程被中断时,抛出异常,锁被释放
  • 超时获取锁:为尝试获取锁设定超时时间

相应API:

  • void lock():普通的获取锁
  • void lockInterruptibly() throws InterruptedException:可中断的获取锁,锁的获取中可以中断线程
  • boolean tryLock():非阻塞获取锁
  • boolean tryLock(long time, TimeUnit unit):超时获取锁
  • void unlock():释放锁

一般框架:

//不要将lock写进try块,防止无故释放
Lock lock = new ReentrantLock();
lock.lock();
try{
 ...;   
}finally{
    lock.unlock();
}

II. 队列同步器AQS

AbstractQueuedSynchronizer:队列同步器,简称AQS,用来构建锁或者其他同步组件的基础框架

使用一个int的成员变量表示同步状态,通过内置的FIFO队列完成资源的排队工作

AQS实现锁可以看作:获取同步状态,成功则加锁成功;失败则加锁失败

调用AQS内部的获取同步状态的API,保证是线程安全的

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)

image-20211101174622290

1. 自己实现一个Mutex互斥锁

首先要继承一个Lock接口,然后自己实现里面的方法

public class Mutex implements Lock {...}

Lock里面的方法是没有默认实现的,因此都需要重写

image-20211101172630798

一般会实现一个继承于AQS的内部类来执行获取同步状态的实现:加锁相当于获取同步状态

public class Mutex implements Lock {
    private static class Syn extends AbstractQueuedSynchronizer{...}
}

可以看到,AQS的方法和锁需要实现的方法是对应的

先实现对应的AQS的几个方法

private static class Syn extends AbstractQueuedSynchronizer{
    //判断同步器是否被线程占用
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
    //获取锁
    @Override
    protected boolean tryAcquire(int arg) {
        if(compareAndSetState(0,1)){
            setExclusiveOwnerThread(Thread.currentThread());    //设置占用线程
            return true;
        }
        return false;
    }
    //释放锁
    @Override
    protected boolean tryRelease(int arg) {
        if(getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);  //清空占用线程
        setState(0);
        return true;
    }
}

锁的获取和AQS获取同步状态其实是一个道理

通过代理模式可以像下面这样实现

public class Mutex implements Lock {
    private static class Syn extends AbstractQueuedSynchronizer{...}
    Syn syn = new Syn();
    @Override
    public void lock() {
        syn.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        syn.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return syn.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return syn.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        syn.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

2. AQS实现分析

锁实现的本质:信号量机制,互斥锁也就是0和1两个信号量

AQS维护了一个FIFO的队列,线程获取同步状态失败则会加入这个队列,然后阻塞,直到同步状态释放,队列首节点的线程被唤醒

同步队列中的节点保存的信息有:获取同步状态失败的线程引用,等待状态,前驱和后继节点

同步器有一个头节点和尾节点

加入新的阻塞线程:

构造节点,加入队列的尾节点

使用compareAndSetTail()加到尾部,这是一个原子操作

image-20211108124223446

2.1 独占式的获取和释放

获取同步状态:

image-20211108125056784

acquire()方法会调用tryAcquire(),如果获取失败,则开始调用addWaiter()来给尾节点添加新节点,再调用acquireQueued()等待请求调度

addWaiter()的作用是给FIFO队列添加尾节点,并返回这个节点的引用

因为可能会多个线程申请失败,因此需要使用原子操作compareAndSetTail()

image-20211108125619724

enq()的作用是快速添加失败后的反复尝试,直到添加尾节点成功

image-20211108125813282

acquireQueued()用来请求调度

image-20211108130254437

可见等待调度期间是支持中断的

这个请求调度有两个条件:

  1. 该节点是首节点
  2. 申请互斥信号量成功

for循环的这个操作被称为自旋

release()释放互斥信号量,根据上文提到的获取信号量,除了tryRelease(),还应该唤醒后继节点

image-20211108130918463

2.2 共享式状态获取和释放

最典型的场景就是读写场景:一个资源允许多个线程进行读取,此时写线程阻塞;而写线程执行时,所有读线程阻塞

共享锁锁也就是资源信号量的应用,主要解决下面问题:只想要有限的线程执行

调用tryAcquireShared()来申请资源信号量

image-20211108132000597

doAcquireShared()是申请失败后,构造节点加入FIFO队列然后自旋的操作

image-20211108132239857

使用releaseShared()来释放

注意:共享式的释放可能有多个线程,需要用CAS操作来实现tryReleaseShared()

image-20211108132450657

3. 自己实现一个TwinsLock共享锁

需要自己实现的:

  • tryAcquiredShared()

  • tryReleaseShared():要保证释放操作的原子性

State()的取值就是资源信号量的取值

public class TwinsLock {
    private int count;
    TwinsLock(int count){
        this.count = count;
    }
    private final Sync sync = new Sync(count);
    private static final class Sync extends AbstractQueuedSynchronizer{
        Sync(int count){
            if(count < 0) throw new IllegalArgumentException();
            setState(count);    //设置资源总数
        }
        @Override
        protected int tryAcquireShared(int arg) {
            for(;;){
                int current = getState();
                int newCount = current - arg;
                if(newCount<0 || compareAndSetState(current, newCount)){
                    return newCount;
                }
            }
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            for(;;){
                int current = getState();
                int newCount = current + arg;
                if(compareAndSetState(current, newCount)){
                    return true;
                }
            }
        }
    }
    public void lock(){
        sync.acquireShared(1);
    }
    public void unlock(){
        sync.releaseShared(1);
    }
}

III. 可重入锁

可重入锁:支持一个线程对资源反复加锁

synchronized支持可重入

ReentrantLock是可重入锁的一种实现,支持反复加锁

锁的公平性:

  • 公平:先对锁进行获取的请求先被满足
  • 不公平:先对锁进行获取的请求不一定先被满足

1. 实现可重入

只需要判断当前线程是否是获取了锁的线程,如果是,则同步状态加一

每次释放同步状态减一,减到0的时候设置获取锁的线程为null,此时允许其他线程获取

接下来来看看ReetrantLock的实现

image-20211108145243069

image-20211108145354670

2. 公平锁与非公平锁

继续观察nofairTryAcquire()方法,发现只要CAS成功,则线程直接获取到锁

image-20211108145705513

而公平锁需要确定队列中没有前驱节点,即自己就是首节点

image-20211108145927580

公平锁:确保线程的FIFO,先上下文切换开销大

非公平锁:可能造成线程饥饿,但线程切换少,吞吐量更大

IV. 读写锁

读写锁,是一种提供共享式和独占式两种方式的锁

  • 支持公平锁和非公平锁
  • 支持重进入
  • 支持锁降级

一个资源允许多个线程进行读取,此时写线程阻塞;而写线程执行时,所有读线程阻塞

1. 读写锁的实现

读写锁的同步状态是按位切割使用的

维护了一个int型的同步状态,32位

高16为读状态,低16位为写状态

1.1 写锁的获取

image-20211108152531993

w是c与0x0000FFFF做与运算后的值,w=0有两种情况:

  1. 有读锁,低16位全0
  2. 无读锁也无写锁,需要后面的条件判断是否为当前线程

1.2 读锁的获取

和写锁的获取类似,需要判断先有没有写锁

不过读锁是共享式的,可以允许多个线程获取读锁

不过读锁也支持重进入,因此不光要维护获取读锁的总状态,还要维护每个线程获取读锁的状态

2. 锁降级

锁降级指:线程先获取写锁,然后再获取读锁,最后释放写锁,实现从写锁降到读锁

目的:保证读写操作的连贯性

使用场景:写操作执行完马上需要读一次,不加读锁的话可能会被其他写线程修改,再读数据可能就变了

V. LockSupport工具

用于阻塞和唤醒线程

image-20211108154154801

VI. Condition接口

Condition接口依赖于Lock对象,用于实现等待-通知模式

核心API就是两个,这两个API的扩展可以增加超时时间,设置中断不敏感等等:

  • await()
  • signal()

1. 使用Condition实现一个阻塞队列

队列满的时候,填充操作阻塞;队列空的时候,取出操作阻塞

public class BoundedQueue <T>{
    private Object[] items;
    private int addIndex, revIndex, count;
    private ReentrantLock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition full = lock.newCondition();

    public BoundedQueue(int size){
        items = new Object[size];
    }

    /**
     * 添加元素
     * @param t
     */
    public void add(T t) throws InterruptedException {
        lock.lock();
        try{
            while(count == items.length){
                System.out.println("已满,请等待消耗");
                empty.await();
            }
            items[addIndex] = t;
            if(++addIndex == items.length) addIndex = 0;
            count++;
            full.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 取出元素
     * @return
     */
    public T remove() throws InterruptedException {
        lock.lock();
        try{
            while(count == 0){
                System.out.println("已空,请等待生产");
                full.await();
            }
            Object temp = items[revIndex];
            if(++revIndex == items.length) revIndex = 0;
            count--;
            empty.signal();
            return (T) temp;
        }finally {
            lock.unlock();
        }
    }
}

2. Condition的实现分析

每个Condition会维护一个等待队列,一个锁支持支持多个等待队列

image-20211108162202120

获取到锁的线程也就是同步队列的首节点

此时再调用await,则首节点进入等待队列,直到其他线程唤醒

image-20211108165508671

相应的,调用signal则是将等待队列的首节点拆下来放到同步队列,唤醒线程开始自旋

当节点回到同步队列,之前调用的await()中的isOnsyncQueue()会返回true,结束等待,在调用acquireQueued()加入竞争

image-20211108165935327

通过isHeldExclusively判断有没有拿到锁