JUC源码学习笔记2——AQS共享和Semaphore,CountDownLatch

本文主要讲述AQS的共享模式,共享和独占具有类似的套路,所以如果你不清楚AQS的独占的话,可以看我的《JUC源码学习笔记1》
主要参考内容有《Java并发编程的艺术》,《Java并发编程实战》和下面两位博主的博客
//segmentfault.com/a/1190000016447307 这是我见过讲AQS共享最好的博客
//www.cnblogs.com/micrari/p/6937995.html 这个文章是对PROPAGATE的作用比较好的诠释

一丶Semaphore+AQS共享模式

1.Semaphore的作用

公司有五个坑位可以用来上厕所,对于一个厕所来说,五个坑位可以看作是五个共享的资源,同时可以允许五个员工(线程)来上厕所,当前任何一个员工进入其中一个坑位,那么可用坑位(共享资源)减少,当员工出来的时候,共享资源被释放,当全部都被人占用的时候,后续上厕所的人需要等待(表现在线程获取共享资源阻塞)当然这个等待可以被中断(测试给等待的开发提了bug,开发放弃排队回到工位)这个等待也可以超时(等太久心态崩了不等了)

Semaphore信号量来控制多个线程同时访问某个特定共享资源的操作数量

很直观,我们可以意识到,Semaphore式基于AQS的共享模式

2.Semaphore常用方法

方法 描述
public Semaphore(int permits) 指定许可数量的构造方法(厕所有多少个坑位)
public Semaphore(int permits, boolean fair) 创建具有给定许可数量和给定公平设置的信号量(第二个参数指定释放公平,好比说员工的素质,有没有上厕所不排队的人)
public void acquire() throws InterruptedException 可中断的获取一个许可,如果获取许可,许可数量减少1方法返回,否则阻塞当前线程之到出现以下情况
1. 其他线程释放了许可,并且当前线程获得了许可(厕所出来了一个,而且你如愿如厕)
2.其他线程中断了当前线程(测试提bug打电话中断了你的排队)
public void acquireUninterruptibly() 和acquire() 类似,但是这个方法不响应中断即在获取许可的途中不会因为中断了放弃(人有三急,天王老子来了也得先如厕)
public boolean tryAcquire() 尝试获取许可,如果成功获取许可返回true并且减少许可,反之返回false,(你来到厕所随便看了以下,有可以的坑位立马进去,反之直接回工位)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException 和tryAcquire()类似,但是响应中断,支持超时,如果在指定时间馁获取到共享资源返回true,如果超时未获取返回false,如果获取的途中被中断抛出中断异常
public void release() 释放许可,并且许可数量加1(如厕完释放坑位)
public int availablePermits() 返回此信号量中可用的当前许可数。
public int drainPermits() 获取并返回所有立即可用的许可证。
protected void reducePermits(int reduction) 按指示的减少量减少可用许可证的数量。
acquire,acquireUninterruptibly,tryAcquire,release还有支持获取指定数量共享资源的重载方法

3.Semaphore是如何实现的

显而易见,Semaphore是基于AQS的共享模式,Semaphore方法的都是委托给Sync

3.1 acquire()——可中断的获取许可(无参数获取一个,有参数指定获取n个)

Semaphore的acquire方法直接调用的是sync的acquireSharedInterruptibly(1),这个方法在sync的父类AbstractQueuedSynchronizer中进行了实现

和ReentrantLock类似的套路,很多并发都是使用这种内部类的方式,把功能的实现交给内部类

3.1.1 tryAcquireShared 尝试获取共享资源
相对于独占的锁的`tryAcquire(int arg)`返回boolean类型的值,共享锁的`tryAcquireShared(int acquires)`返回的是一个整型值:

- 如果该值小于0,则代表当前线程获取共享锁失败
- 如果该值大于0,则代表当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
- 如果该值等于0,则代表当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败
3.1.1.1 非公平的尝试获取共享资源

直接调用的是nonfairTryAcquireShared方法

final int nonfairTryAcquireShared(int acquires) {
    //一个自选
    for (;;) {
        //可用的许可
        int available = getState();
		//剩余=可用-当前需要的许可
        int remaining = available - acquires;//   1
        
		//如果剩余小于0 或 cas设置许可数量位true 返回剩余剩余许可数量
        //值得品一品
        if (remaining < 0 ||
            compareAndSetState(available, remaining))//  2
            return remaining;
    }
}
  • 自旋结束的情况

    1. 剩余许可小于0 表示当前剩余的许可不足以满足我们的要求

    2. 当前许可可以满足我们的需求,且成功CAS修改许可的数量

    可能线程A 执行到1这一行发现是足够的,但是当前很多线程在竞争资源,导致执行2的时候当前线程CAS失败,那么会进入下一轮循环
    
3.1.1.2 公平的尝试获取共享资源
protected int tryAcquireShared(int acquires) {
    for (;;) {
        
        //如果前面有线程在等待,公平起见,返回-1 获取共享资源失败
        if (hasQueuedPredecessors())
            return -1;
        
        //和非公平一样
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
3.1.2 doAcquireSharedInterruptibly 排队获取共享资源

虽然在独占模式中没有名称叫doAcquireInterruptibly的方法,但是还是那个套路

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {

    //构造节点,加入到同步队列尾部
    final Node node = addWaiter(Node.SHARED);
    //获取失败标志
    boolean failed = true;
    try {
        //自选
        for (;;) {
            //前继节点
            final Node p = node.predecessor();
			//前继节点是头节点
            if (p == head) {
                //尝试获取共享资源
                int r = tryAcquireShared(arg);
				//获取成功
                if (r >= 0) {
                    //设置为头节点并且传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //挂起当前线程 如果被中断那么直接抛出中断异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        //如果当前节点放弃了,这里对应被中断了(超时获取方法在超时的情况也会进入)
        if (failed)
            cancelAcquire(node);
    }
}

和独占不同的点在于:

  • addWaiter(Node.SHARED) 标记当前节点是共享模式

    这个Node.SHARED设置到当前节点的nextWaiter属性上,nextWaiter在此的作用只是标记当前节点的模式(独占or共享)

    在Condition等待队列中才起到串联等待线程的作用的,后续会有专门一篇讲解
    
  • 独占的时候调用的是setHead方法,这里调用的是 setHeadAndPropagate(当前线程节点,tryAcquireShared返回值(在信号量中可以理解为剩余的许可证数量))

3.1.3 setHeadAndPropagate 设置为头节点并且传播
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //设置为头,头在AQS是获取到锁的线程,也意味着从同步队列中出队了,
    setHead(node);
    
    //唤醒
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared方法的作用的是在当前共享锁是可获取的状态时,唤醒head节点的下一个节点,这个方法的详细作用的后面讲解,现在我们来分析,setHeadAndPropagate在什么情况下会调用这个方法

  • 剩余的共享资源大于0 propagate > 0

    在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,其还有剩余的共享资源,那么就可以直接通知后继节点来拿锁,而不必等待锁被释放的时候再通知。(来到厕所发现五个坑都可用,发消息给好兄弟,快来,拉屎自由)

  • 当前节点的下一个节点不为空且是共享模式 if (s == null || s.isShared())

  • 旧头节点等待状态小于0 or 当前头节点等待状态小于0

    共享资源在被获取后,线程都会设置自己为头节点,所有头节点在共享模式中表示的是获取到共享资源的线程或者曾经获取共享资源的线程

3.1.4 doReleaseShared 唤醒后续等待线程
在当前共享锁是可获取的状态时,唤醒head节点的下一个节点

这个方法除了在setHeadAndPropagate 中被调用意外,还在共享资源的释放(releaseShared)中会被调用,想象一个场景,存在一个线程A释放锁的同时,一个线程B拿到锁,前者调用releaseShared,后者调用setHeadAndPropagate ,并发的调用到doReleaseShared 方法进行唤醒头节点下一个节点,所以doReleaseShared 需要考虑线程安全问题

3.1.4.1源码粗略解读
//值得好好品一品
private void doReleaseShared() {
    //循环
    for (;;) {
        //头  可能这一行执行完h就是旧的头,存在另外一个线程获取到共享锁,将自己设置为头
        Node h = head;
        //h头不为null 不等于尾,说明至少需要当前队列中至少有两个节点
        if (h != null && h != tail) {
            //h头的状态
            int ws = h.waitStatus;
            //h头状态为SINGNAL 说明后续节点入队的时候addWaiter把当前节点的状态设置,说明后续节点需要唤醒
            if (ws == Node.SIGNAL) {
                //CAS修改h状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;          
                //唤醒后续线程
                unparkSuccessor(h);
            }
            //h状态为0 且CAS设置为PROPAGATE 失败
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        
        //h 等于当前头,说明执行上面一段的时候没有头没有变化,没有其他线程获取共享资源
        if (h == head)                  
            break;
    }
}
3.1.4.2 doReleaseShared 循环退出的条件和这样设计的目的
if (h == head)                  
     break;

这里h是刚进入doReleaseShared 时候的头节点,head是当前队列的头,如果二者相等那么退出循环

  • 什么时候不相等昵——线程A释放共享资源调用doReleaseShared 的时候还没执行到循环退出的时候,线程B获取到共享资源设置自己为新的头节点
  • 不相等发生什么情况——线程A继续for循环执行,假如这个时候线程B也释放资源,那么这个方法存在被多个线程一起执行的情况
  • 这样设计的目的——设计成每个节点只唤醒自己的后继线程也可以实现同样的功能,但是多个线程一起执行唤醒可以提高唤醒等待共享资源线程的效率。甚至在新的线程获取到共享锁的时候还会调用doReleaseShared,唤醒后继节点,
一个厕所有五个坑,在某一个时刻五个坑被ABCDE占用,后面还有EF两个倒霉等待排成队列,ABCDE占用坑的时候都会设置自己为头节点,会有几个人获取到坑位的时候调用doReleaseShared (比如D第四个来,发现还有一个坑,立马说,后面的兄弟还有一个厕所)再比如五个坑都被占用但是E发现自己的状态为SINGAL(是E排队的时候提醒自己拉完提醒自己,他先玩会儿手机(挂起))

某一个时刻多个人拉完的时候,释放坑位走出厕所,A释放到if (h == head)的时候,发现头节点变化了,继续喊兄弟们去看看说不定有坑位,B也是如此,同一个时间可能有多个拉完的人都在唤醒后面的人去上厕所,这样后面排队玩手机的人,被唤醒的效率更高,从而提升了厕所的利用效率
3.1.4.3.doReleaseShared 唤醒的逻辑
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            
            //1.情况一
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;           
                unparkSuccessor(h);
            }
            
            //2.情况二
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;              
        }
        if (h == head)                   
            break;
    }
}
  • 情况一

    ws == Node.SIGNAL 说明是后继节点入队的时候修改了节点的状态,表示需要唤醒

    这里使用CAS保证多个线程执行当前方法只有一个线程可以成功唤醒后续的线程(共享锁存在多个线程并发唤醒的情况)

    同时两个小伙子从厕所出来,只需要一个人通知到等待的人就好
    
  • 情况二

    ws == 0 可能是头节点没有后继入队,所以节点状态为初始状态,但是上面的 if (h != null && h != tail) 保证了队列中至少存在两个节点

    ws == 0 还可能是上面的情况1修改为了0,但是这种情况不会进入当前分支

    最后只可能是尾节点成为了头节点,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)需要返回false才能继续循环,说明后续节点入队修改了节点的状态为SIGANAL,此时会继续循环唤醒后继节点。注意到最上面if (h != null && h != tail) 也就是说队列至少存在两个节点,让代码运行到情况1,要想ws == Node.SIGNAL不成立说明这个头节点刚刚成为头节点,状态还没来得及被后继节点修改为SINGANL,紧接着后继节点恰好修改了头节点状态为SINGAL才能促使!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)为true,也就是说明情况二是在新的头节点产生,且没来得及被后继节点修改为SINGAL,并且在头节点线程执行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)的一瞬间后继节点抢先一步修改了头节点的状态为SINGAL才能走到情况二的continue中

    10:40:01的时候A成功获得如厕的权利,此时厕所是满的,A设置自己为头节点,发现原来的头节点状态是SINGANL,他准备唤醒后面排队的兄弟
    10:40:02 B发现没有厕所,排在队列的第一个,准备修改A的状态为SINGAL(让A记得唤醒自己)此时A已经在执行唤醒的流程了,此时队列存在两个节点,A为头,B为尾巴,A执行到情况1,发现自己不是SINGAL,来到情况2,准备修改自己状态为PROPAGATE但是失败了(此时B刚好修改A状态为SINGAL了)A继续执行for循环(可能存在其他人上完厕所,唤醒了B,B成为新头节点),
    此时A会拿到队列的头节点(最近刚刚获得锁的节点)继续执行for循环,最后队列的头节点没有变化了,A才罢休
    

3.2 acquireUninterruptibly——不可中断的获取许可证

直接调用的是AQS的acquireShared(1)方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

大致逻辑和acquireSharedInterruptibly,其不响应中断体现在doAcquireShared中

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    //如果被中断了,那么补上中断,而不是抛出异常
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //parkAndCheckInterrupt() 放回true 表示当前线程从LockSupport中返回是因为被中断了,那么把interrupted置为true,继续循环
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

套路和独占模式中类似,当前线程从LockSupport中返回后检查其中断表示,发现是由中断那么会在当前获取到共享资源后补上中断标识

3.3tryAcquire(int permits, long timeout, TimeUnit unit)超时获取许可

直接调用AQS的tryAcquireSharedNanos方法

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	
    //如果直接获取共享资源成功那么直接返回true了 短路后续的doAcquireSharedNanos
    return tryAcquireShared(arg) >= 0 ||

        //超时获取共享资源
        doAcquireSharedNanos(arg, nanosTimeout);
}
3.3.1doAcquireSharedNanos 超时获取共享资源
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; 
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

和独占的超时获取一样的套路,都是必须剩余时间大于spinForTimeoutThreshold阈值才挂起否则进行自旋,响应中断也是类似的内容

3.4 release(int permits)——释放许可

直接调用的AQS的releaseShared

public final boolean releaseShared(int arg) {
    //信号量中静态内部类重写
    if (tryReleaseShared(arg)) {
		//唤醒后续等待线程 前面说过
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    //自旋
    for (;;) {
       
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
       
        //CAS设置许可的数量,
        if (compareAndSetState(current, next))
            return true;
    }
}

和独占不同的是,共享锁存在多个多个线程一起释放的情况,所以使用自选+CAS保证许可证的数量不会在并发的情况下出现错误

减少许可的逻辑类似,也是循环+CAS的方式

二丶CountDownLatch闭锁

1.CountDownLatch的作用

闭锁的工具相当于一扇门,在闭锁达到结束状态前,门一直是关闭,没有任何线程可以通过,当达到结束状态后,闭锁才会让允许所有线程通过,当闭锁到达结束状态后不会再改变状态保持开状态

2.CountDownLatch的使用场景和其常用方法

2.1使用场景

  • 确保某个计算再所有资源准备后在继续执行
  • 确保某个服务在其依赖的所有其他其他服务都启动后才启动
  • 等待某个操作所有参与者就绪后再继续执行(moba所有玩家确认接受游戏后再进入选英雄)

2.2常用方法

方法 作用
void await() throws InterruptedException 使当前线程等待直到锁存器倒计时到零,从这个方法返回的两个方式
1.计数到0
2.等待的线程被中断,抛出中断异常返回
void await(long timeout, TimeUnit unit)throws InterruptedException 和await()类似,但是如果超时也会直接返回
void countDown() 计数减1,如果计数到达0那么所有等待的线程将可以通行
long getCount() 返回当前计数。此方法通常用于调试和测试目的

3.CountDownLatch 是如何实现的

AB线程需要等待CDE执行完后继续执行,其实CDE霸占锁阻塞AB,后CDE都释放锁后AB才能继续运行

3.1 await 响应中断的等待计数为0 & await(long timeout, TimeUnit unit) 超时响应中断等待计数为0

3.1.1 await 响应中断的等待计数

直接调用静态内部类的acquireSharedInterruptibly(1)方法,这个方法会直接调用静态内部类实例的tryAcquireShared(1)方法

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

获取AQS中state(CountDownLatch构造方法设置此值,表示需要等待多少个线程执行countDown),上面我们说过tryAcquireShared返大于等于0表示获取共享资源成功,负数表示失败后续会进入等待队列中,这里并没有返回0这种情况,如果共享资源为0表示“门开了”执行此方法的线程可以自由的运行了,反之将排队等待,之所以没有返回0,是因为CountDownLatch支持多个线程比如ABC一起等待,返回0表示当前线程获取资源成功但是后续线程获取会失败,返回1可以保证当前线程看见门开了后会去唤醒其他线程

3.1.2 await(long timeout, TimeUnit unit) 超时响应中断等待计数为0

直接调用的AQS的tryAcquireSharedNanos,同样调用重写的tryAcquireShared 方法,后续调用doAcquireSharedNanos 逻辑和上面信号量的内容一样

3.2 countDown 计数减少1

直接调用AQS的releaseShared方法,调用到重写的tryReleaseShared方法

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        
        //减少之前就是0 直接返回false
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
 			//减少到0 才会返回true
            return nextc == 0;
    }
}

还是同样的套路CAS+自选保证state,但是如果减少之前就已经为0的话的返回false,且只有减少后为0 才会返回true,保证了无效的多次唤醒

假如计数为4,但是五个线程执行countDown,第五个线程执行的时候发现被四个老六抢先一步了,直接返回了false,前面四个老六执行的时候也只有最后一个才会返回true(减少到0)这时候才会执行tryReleaseShared 进行唤醒

三丶关于共享资源的一点图

1.doReleaseShared多线程同时调用的情况

可以看到当多个线程释放共享资源的时候,如果当前队列中存在排队的节点,那么可能存在多线程一起并发调用doReleaseShared的可能,如果头节点为signal 说明后续节点需要唤醒,使用CAS保证了只有一个线程可以成功执行unparkSuccessor唤醒后续线程,后续线程也许在此之前执行了tryAcquireShard返回负数,准备挂起自己,也许在挂起自己之前被执行了unpark,或者挂起之后立马被执行了unpark,继续拿共享资源,而那些CAS失败的线程会继续唤醒,这点体现了三个资源释放,不会只唤醒一个。并且这个方法退出的方法只有在唤醒途中头节点没有变化的情况,没有变法说明共享资源的争抢没有那么激烈了(头节点是最近拿到共享资源的节点)

2.doReleaseShared一个极其短暂的状态

这个时候线程B 和线程A 必定存在一个线程CAS失败,如果线程B失败,那么意味线程A成功CAS为SIGNAL,但是shouldParkAfterFailedAcquire 返回false 还要继续自旋,这时候也许tryAcquireShared成功了就没有必要挂起了,如果线程A自选到tryAcquireShared,被一个老六线程抢先一步获取共享共享资源了,这时候线程A会执行shouldParkAfterFailedAcquire 返回true 准备挂起自己了,这是线程B也许就成功唤醒了线程A。如果线程ACAS失败了,还会进行一次自旋,线程B如果CAS成功也会进行一次自旋,也许线程A就成功拿到共享资源改变自己为头节点,线程B还要执行一次自旋。这一切都为了提高系统的吞吐量让共享资源尽量不要浪费,不要因为唤醒的不及时而让需要应该工作的线程被挂起。

Tags: