­

互联网JAVA面试常问问题(七)- 带你走入AQS同步器源码

  • 2019 年 10 月 6 日
  • 筆記

周末叫醒我的不是忘记关掉的闹钟,而是一颗爱学习的红心~大家周日好,欢迎和小强一起继续研究互联网JAVA面试题~

上一篇文章中,小强给大家介绍了可重入锁的相关知识,通过学习ReentrantLock的源码能够看出,它继承了AbstractQueuedSynchronizer。所以,本文再简单的的介绍一下AQS的源码。

AbstractQueuedSynchronizer

队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。AQS的源码如下:

public abstract class AbstractQueuedSynchronizer         extends AbstractOwnableSynchronizer         implements java.io.Serializable {      protected AbstractQueuedSynchronizer() { }      static final class Node {         ........     }      private transient volatile Node head;      private transient volatile Node tail;      private volatile int state;      protected final int getState() {          return state;     }      protected final void setState(int newState) {         state = newState;     }     protected final boolean compareAndSetState(int expect, int update) {        // See below for intrinsics setup to support this         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);     }

从源码可以看出,AQS内部使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,多线程争用资源被阻塞时会进入此队列,其中volatile修饰了以下变量,来保证了多线程之间的可见:

1.内部状态state

2.等待队列的头节点head

3.等待队列的尾节点head。

state的访问方式有三种:

1.getState()

2.setState()

3.compareAndSetState()

其中FIFO队列的实现如下,每个节点中,除了存储了当前线程,前后节点的引用以外,还有一个waitStatus变量,用于描述节点当前的状态(共四种状态:1.CANCELLED 取消状态;2.SIGNAL 等待触发状态;3.CONDITION 等待条件状态;4.PROPAGATE 状态需要向后传播 )

static final class Node {  static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();  static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;  static final int CANCELLED =  1;  static final int SIGNAL    = -1;  static final int CONDITION = -2;  static final int PROPAGATE = -3;  volatile int waitStatus;  volatile AbstractQueuedSynchronizer.Node prev;  volatile AbstractQueuedSynchronizer.Node next;  volatile Thread thread;AbstractQueuedSynchronizer.Node nextWaiter;  final boolean isShared() {      return nextWaiter == SHARED;  }  final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {     AbstractQueuedSynchronizer.Node p = prev;      if (p == null)          throw new NullPointerException();      else         return p;  }  }

AQS的不同子类争用共享资源的方式也不同。AQS子类中,只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。  tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。  tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。  tryAcquireShared(int):共享方式。尝试获取资源。负数表示失;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。  tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

在AQS中,有两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。接下来主要介绍Exclusive下是如何获取和释放锁的:

锁获取acquire(int)
 /**      * Acquires in exclusive mode, ignoring interrupts.  Implemented      * by invoking at least once {@link #tryAcquire},      * returning on success.  Otherwise the thread is queued, possibly      * repeatedly blocking and unblocking, invoking {@link      * #tryAcquire} until success.  This method can be used      * to implement method {@link Lock#lock}.      *      * @param arg the acquire argument.  This value is conveyed to      *        {@link #tryAcquire} but is otherwise uninterpreted and      *        can represent anything you like.      */     public final void acquire(int arg) {          if (!tryAcquire(arg) &&             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))             selfInterrupt();     }

所以在获取锁的过程中,我们看到了如下步骤

1.tryAcquire()尝试直接去获取资源,如果成功则直接返回;

2.addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

3.acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

锁释放release(int)
    /**      * Releases in exclusive mode.  Implemented by unblocking one or      * more threads if {@link #tryRelease} returns true.      * This method can be used to implement method {@link Lock#unlock}.      *      * @param arg the release argument.  This value is conveyed to      *        {@link #tryRelease} but is otherwise uninterpreted and      *        can represent anything you like.      * @return the value returned from {@link #tryRelease}      */     public final boolean release(int arg) {          if (tryRelease(arg)) {             Node h = head;              if (h != null && h.waitStatus != 0)                 unparkSuccessor(h);              return true;         }          return false;     }

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。它调用tryRelease()来释放资源。

小结

最后再回顾一下AQS的应用,以及ReentrantLock是如何实现的

ReentrantLock中state初始化为0,表示未锁定状态。当一个线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到此线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,这个线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。