【Java 並發編程實戰】使用 AQS 實現一個簡單的互斥鎖

  • 2020 年 2 月 24 日
  • 筆記

使用 AQS 實現一個簡單的互斥鎖

AQS 是什麼?

參考[2]。

    /**       * Returns a collection containing threads that may be waiting to       * acquire.  Because the actual set of threads may change       * dynamically while constructing this result, the returned       * collection is only a best-effort estimate.  The elements of the       * returned collection are in no particular order.  This method is       * designed to facilitate construction of subclasses that provide       * more extensive monitoring facilities.       *       * @return the collection of threads       */      public final Collection<Thread> getQueuedThreads() {          ArrayList<Thread> list = new ArrayList<Thread>();          for (Node p = tail; p != null; p = p.prev) {              Thread t = p.thread;              if (t != null)                  list.add(t);          }          return list;      }          /**       * Returns a collection containing threads that may be waiting to       * acquire in exclusive mode. This has the same properties       * as {@link #getQueuedThreads} except that it only returns       * those threads waiting due to an exclusive(獨有的;排外的;專一的) acquire.       *       * @return the collection of threads       */      public final Collection<Thread> getExclusiveQueuedThreads() {          ArrayList<Thread> list = new ArrayList<Thread>();          for (Node p = tail; p != null; p = p.prev) {              if (!p.isShared()) {                  Thread t = p.thread;                  if (t != null)                      list.add(t);              }          }          return list;      }

CAS自旋

CAS 是Compare And Swap的簡稱,具有單一變數的原子操作特性,對比成功後進行交換操作,他是樂觀操作,期間會無限循環操作,直到對比成功,然後進行後續交互操作

CAS 包含了三個操作數據,記憶體位置V、預期值A、新預期值B,如果當前記憶體V存放的數據和A一樣,就認為比較成功,然後把當前V所在的位置設置為B。

if V==A (Compare)  then V=B (Swap)

因為會無限循環操作,所以可能導致CPU效率低下,而且運行中還會導致ABA問題,也就是A->B->A的問題,誤以為此時數據未發生變化,其實中間已經發生變化。該問題在java中提供了類AtomicStampedReference解決該問題,先會查看當前引用值是否符合期望,ABA也會變成1A->2B->3A,這樣很清楚的感知到發生變化了。

java.util.concurrent.locks static final class AbstractQueuedSynchronizer.Node extends Object

Wait queue node class. The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks(自旋鎖). We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait. To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.

Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts. The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/ We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.) Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility. CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention. Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class. < 1.8 >

   /**       * Creates and enqueues node for current thread and given mode.       *       * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared       * @return the new node       */      private Node addWaiter(Node mode) {          Node node = new Node(Thread.currentThread(), mode);          // Try the fast path of enq; backup to full enq on failure          Node pred = tail;          if (pred != null) {              node.prev = pred;              if (compareAndSetTail(pred, node)) {                  pred.next = node;                  return node;              }          }          enq(node);          return node;      }            // Queuing utilities        /**       * The number of nanoseconds for which it is faster to spin       * rather than to use timed park. A rough estimate suffices       * to improve responsiveness with very short timeouts.       */      static final long spinForTimeoutThreshold = 1000L;        /**       * Inserts node into queue, initializing if necessary. See picture above.       * @param node the node to insert       * @return node's predecessor       */      private Node enq(final Node node) {          for (;;) {              Node t = tail;              if (t == null) { // Must initialize                  if (compareAndSetHead(new Node()))                      tail = head;              } else {                  node.prev = t;                  if (compareAndSetTail(t, node)) {                      t.next = node;                      return t;                  }              }          }      }

簡單互斥鎖實現程式碼

package com.light.sword    import java.util.concurrent.locks.AbstractQueuedSynchronizer    /**   * @author: Jack   * 2020-02-10 15:42   * 使用 AQS 實現一個簡單的互斥鎖   */    class MutexDemo {      private val sync = Sync()      fun lock() {          sync.acquire(0)      }        fun unlock() {          sync.release(0)      }        class Sync : AbstractQueuedSynchronizer() {          private val LOCKED = 1          private val UNLOCKED = 0            init {              state = UNLOCKED          }            override fun tryAcquire(arg: Int): Boolean {              return compareAndSetState(UNLOCKED, LOCKED)          }            override fun tryRelease(arg: Int): Boolean {              state = UNLOCKED              return true          }      }    }

怎樣使用互斥鎖

fun main() {      for (i in 0..10) {          Thread {              task(Thread.currentThread().name)          }.start()      }        val mutex = MutexDemo()      for (i in 0..10) {          Thread {              mutex.lock()              task(Thread.currentThread().name)              mutex.unlock()          }.start()      }  }    fun task(threadName: String) {      println("-------------")      println("${System.currentTimeMillis()} : $threadName")      Thread.sleep(1000)  }      // 輸出:  -------------  -------------  1581489100026 : Thread-1  -------------  -------------  1581489100026 : Thread-2  -------------  1581489100026 : Thread-5  -------------  1581489100026 : Thread-0  -------------  1581489100026 : Thread-6  -------------  1581489100027 : Thread-10  -------------  -------------  1581489100027 : Thread-9  1581489100026 : Thread-4  -------------  1581489100027 : Thread-3  1581489100026 : Thread-8  1581489100027 : Thread-7  -------------  1581489100027 : Thread-11  -------------  1581489101028 : Thread-12  -------------  1581489102029 : Thread-13  -------------  1581489103030 : Thread-14  -------------  1581489104031 : Thread-15  -------------  1581489105032 : Thread-16  -------------  1581489106034 : Thread-17  -------------  1581489107036 : Thread-18  -------------  1581489108039 : Thread-19  -------------  1581489109042 : Thread-20  -------------  1581489110046 : Thread-21

參考資料:

[1] https://www.jianshu.com/p/282bdb57e343

[2] Java並發編程實戰: AQS 源碼 史上最詳盡圖解+逐行注釋: https://blog.csdn.net/universsky2015/article/details/95813887