Java並發-AbstractQueuedSynchronizer(AQS)JDK源代碼分析

  • 2020 年 2 月 19 日
  • 筆記

一、概要分析:

1.1 引子

 學習Java並發編程不得不去了解一下java.util.concurrent這個包,這個包下面有許多我們經常用到的並發工具類,例如:ReentrantLock,CountDownLatch,CyclicBarrier, Semaphore等。而這些類的底層實現都依賴於AbstractQueuedSynchronizer這個類,由此可見這個類的重要性。所以在Java並發系列文章中我首先對AbstractQueuedSynchronizer這個類進行分析。為了敘述簡單,後續有些地方會用AQS代表這個類。

1.2 AbstractQueuedSynchronizer 的用途

 相信要許多讀者使用過ReentrantLock,但是卻不知道AbstractQueuedSynchronizer的存在。其實ReentrantLock實現了一個內部類Sync,該內部類繼承了AbstractQueuedSynchronizer,所有鎖機制的實現都是依賴於Sync內部類,也可以說ReentrantLock的實現就是依賴於AbstractQueuedSynchronizer類。於此類似,CountDownLatch, CyclicBarrier, Semaphore這些類也是採用同樣的方式來實現自己對於鎖的控制。可見,AbstractQueuedSynchronizer是這些類的基石。那麼AQS內部到底實現了什麼以至於所以這些類都要依賴於它呢?

 隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步組件的基礎框架, 它使用了一個int成員變量表示同步狀態, 通過內置的FIFO隊列來完成資源獲取線程的排隊工作, 並發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態, 在抽象方法的實現過程中免不了要對同步狀態進行更改,這時就需要使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操作,因為它們能夠保證狀態的改變是安全的。子類推薦被定義為自定義同步組件的靜態內部類, 同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用, 同步器既可以支持獨佔式地獲取同步狀態, 也可以支持共享式地獲取同步狀態,這樣就可以方便實現不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

 同步器是實現鎖(也可以是任意同步組件)的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關係:鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程並行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式, 隱藏了同步狀態管理、 線程的排隊、 等待與喚醒等底層操作。 鎖和同步器很好地隔離了使用者和實現者所需關注的領域。  下圖是關於ReentrantLock類(重入鎖)的內部繼承邏輯,反映了上面這段關於鎖和同步器如何結合使用的設計思想,比如說如果你單純地調用ReentrantLock構造器來構造了一個對象,那麼你可能沒有認識到其內部的很多邏輯是依靠AQS的子類來實現的,而這一切依靠的都是靜態內部類設計思想的魅力,能夠簡化使用者的使用成本,至少不用學內部實現原理就能快速地調用Lock接口規定的鎖方法,但是如果你是一個JDK源碼愛好者,你一定不會放過AQS源代碼的學習。

Lock接口:定義了有關自定義鎖結構至少需要的抽象方法

ReentantLock類:是JDK提供的一個自定義鎖類,使用此類的構造器進行鎖對象的構造。

NonfairSync/FairSync靜態內部類:是Sync的子類,分別提供不公平和公平的鎖搶佔方式

Sync靜態內部抽象類:AQS的子類,實現了與重寫了部分AQS方法

Condition接口:定義了有關鎖下面的通知等待機制;

ConditionObject公共內部類:實現了Condition接口的抽象方法,完成了鎖的通知等待機制。

1.3 AbstractQueuedSynchronizer類的成員變量分析

//同步隊列的頭結點  private transient volatile Node head;    //同步隊列的尾結點  private transient volatile Node tail;    //同步狀態  private volatile int state;    //獲取同步狀態  protected final int getState() {      return state;  }    //設置同步狀態  protected final void setState(int newState) {      state = newState;  }    //以CAS方式設置同步狀態  protected final boolean compareAndSetState(int expect, int update) {      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  }

 上面的代碼列出了AQS的所有成員變量,可以看到AQS的成員變量只有三個:

  1. 同步隊列頭結點引用 private transient volatile Node head;
  2. 同步隊列尾結點引用private transient volatile Node tail;
  3. 同步狀態private volatile int state;

 注意,這三個成員變量都使用了volatile關鍵字進行修飾,這就確保了多個線程對它的修改都是內存可見的。整個類的核心就是這個同步狀態,可以看到同步狀態其實就是一個int型的變量,同步狀態決定了當前鎖對象是否允許繼續被佔用或者,也可以根據鎖狀態來判斷當前同步鎖被多少個線程佔據了。當然鎖的狀態根據不同子類的不同具體需求而有所不同,例如在ReentrantLock中,state等於0表示鎖是開的,state大於0表示鎖是鎖着的,且其大小正比於一個線程重複佔據一個重入鎖的此數,而在Semaphore中,state大於0表示鎖是開的,state等於0表示鎖是鎖着的。

1.4 AbstractQueuedSynchronizer的排隊區實現原理:同步、條件隊列

AbstractQueuedSynchronizer內部其實有兩個排隊區,一個是同步隊列,一個是條件隊列。從上圖可以看出,同步隊列只有一條,而條件隊列可以有多條。其中唯一的同步隊列的管理者是AQS對象本身,而條件隊列由AQS內部的Conditon接口對象管理,每一個Condition對象都對應着一個由其管理的條件等待隊列。同步隊列的結點分別持有前後結點的引用(雙向隊列),而條件隊列的結點只有一個指向後繼結點的引用(向後指向的單向隊列)。

 圖中T表示線程,每個結點包含一個線程,線程在獲取鎖失敗後首先進入同步隊列排隊,而想要進入條件隊列該線程必須持有鎖才行。所以在此類中,節點是線程的存儲之處,所以節點和線程在某些語境下是通用的。

 接下來我們看看隊列中每個結點的結構(相關域以及方法)。

//同步隊列的結點  static final class Node {        static final Node SHARED = new Node(); //表示當前線程以共享模式持有鎖        static final Node EXCLUSIVE = null;    //表示當前線程以獨佔模式持有鎖        static final int CANCELLED =  1;       //表示當前結點已經取消獲取鎖        static final int SIGNAL    = -1;       //表示後繼結點的線程需要運行        static final int CONDITION = -2;       //表示當前結點在條件隊列中排隊        static final int PROPAGATE = -3;       //表示後繼結點可以直接獲取鎖        volatile int waitStatus; //表示當前結點的等待狀態        volatile Node prev;      //表示同步隊列中的前繼結點        volatile Node next;      //表示同步隊列中的後繼結點        volatile Thread thread;  //當前結點持有的線程引用        Node nextWaiter;         //表示條件隊列中的後繼結點,條件隊列中節點沒有前繼節點        //當前結點狀態是否是共享模式      final boolean isShared() {          return nextWaiter == SHARED;      }        //返回當前結點的前繼結點      final Node predecessor() throws NullPointerException {          Node p = prev;          if (p == null) {              throw new NullPointerException();          } else {              return p;          }      }        //構造器1      Node() {}        //構造器2, 默認用這個構造器      Node(Thread thread, Node mode) {          //注意持有模式是賦值給nextWaiter          this.nextWaiter = mode;          this.thread = thread;      }        //構造器3, 只在條件隊列中用到      Node(Thread thread, int waitStatus) {          this.waitStatus = waitStatus;          this.thread = thread;      }  }

Node代表同步隊列和條件隊列中的一個結點,它是AbstractQueuedSynchronizer的內部類。Node有很多屬性,比如持有模式,等待狀態,同步隊列中的前繼和後繼,以及條件隊列中的後繼引用等等。每個節點都存儲着指向一個線程對象的引用變量,所以一定程度上我們完全可以把節點理解為線程對象。

1.5 理解獨佔模式和共享模式

AQS的內部類Node定義了兩個節點Node常量SHARED和EXCLUSIVE,他們分別標識 AQS隊列中等待線程的鎖獲取模式。

java並發包提供的加鎖模式分為獨佔鎖和共享鎖:

  1. 獨佔鎖模式下,每次只能有一個線程能持有鎖,就是以獨佔方式實現的互斥鎖。獨佔鎖是一種悲觀保守的加鎖策略,它避免了讀/讀衝突,如果某個只讀線程獲取鎖,則其他讀線程都只能等待,這種情況下就限制了不必要的並發性,因為讀操作並不會影響數據的一致性。
  2. 共享鎖,則允許多個線程同時獲取鎖,並發訪問 共享資源,如:ReadWriteLock。共享鎖則是一種樂觀鎖,它放寬了加鎖策略,允許多個執行讀操作的線程同時訪問共享資源。 java的並發包中提供了ReadWriteLock,讀-寫鎖。它允許一個資源可以被多個讀操作訪問,或者被一個 寫操作訪問,但讀寫操作不能同時進行。

AQS提供了獨佔鎖和共享鎖必須實現的方法,具有獨佔鎖功能的子類,它必須實現tryAcquiretryReleaseisHeldExclusively等;共享鎖功能的子類,必須實現tryAcquireSharedtryReleaseShared等方法,帶有Shared後綴的方法都是支持共享鎖加鎖的語義。

 獨佔鎖獲取鎖時,設置節點模式為Node.EXCLUSIVE:

public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();      }

 共享鎖獲取鎖,節點模式則為Node.SHARED

     private void doAcquireShared(int arg) {          final Node node = addWaiter(Node.SHARED);          boolean failed = true;          .....      }

 而上述兩個不同模式下獲取鎖得方式稍後會在各自模式的介紹過程中進行具體講述。這裡只需記住,獨佔/共享模式不是用來描述AQS對象的,因為AQS沒有定義這兩者的狀態的相關域。獨佔和共享是節點的性質,節點又和線程有着自然的對應關係,所以獨佔模式以及共享模式是用來描述線程是如何來佔用鎖資源的。

1.6 理解結點的等待狀態

 我們還看到每個結點都有一個等待狀態(節點在等待序列中,自然狀態名為等待狀態),這個等待狀態分為CANCELLEDSIGNALCONDITIONPROPAGATE四種狀態:

    volatile int waitStatus; //表示當前結點的等待狀態
  1. CANCELLED :當這個線程在排隊過程中已經打算放棄了,它就會將自己座位上的牌子設置為CANCELLED,此狀態的舊節點在新節點遍歷向前找時會被清理出隊列。具體見:shouldParkAfterFailedAcquire方法;
  2. SIGNAL :狀態為SIGNAL的線程在執行完自己的代碼後,退出線程前,回去喚醒下一個在等待隊列中的線程/節點。只有保證前面節點的狀態為SIGNAL,當前節點才能夠保證被喚醒;
  3. CONDITION:表示該線程在條件隊列中排隊;
  4. PROPAGATE:提醒後面來的線程可以直接獲取鎖,這個狀態只在共享模式用到,後面單獨講共享模式的時候會講到。

 這幾種waitStates狀態用表格表示就是:

狀態值

狀態

說明

1

CANCELLED

取消狀態

-1

SIGNAL

等待觸髮狀態

-2

CONDITION

等待條件狀態

-3

PROPAGATE

狀態需要向後傳播(只在共享模式下使用)

 這裡與線程池狀態判斷中是否處於Running狀態具有類似的邏輯判斷方式,只要判斷當前線程的狀態值是否小於零,小於則意味着當前線程處於非取消狀態,如果大於零,那麼意味着線程已經處於取消狀態了。

​ 下面舉一些AQS子類的一個具體例子,來說明AQS類中等待狀態的相關性質:

AQS中有一個state變量,該變量對不同的子類實現具有不同的意義,對ReentrantLock來說,它表示加鎖的狀態:

  • 無鎖時state=0,有鎖時state>0;
  • 第一次加鎖時,將state設置為1;
  • 由於ReentrantLock是可重入鎖,所以持有鎖的線程可以多次加鎖,經過判斷加鎖線程就是當前持有鎖的線程時(即exclusiveOwnerThread==Thread.currentThread()),即可加鎖,每次加鎖都會將state的值+1,state等於幾,就代表當前持有鎖的線程加了幾次鎖;
  • 解鎖時每解一次鎖就會將state減1,state減到0後,鎖就被釋放掉,這時其它線程可以加鎖;
  • 當持有鎖的線程釋放鎖以後,如果是等待隊列獲取到了加鎖權限,則會在等待隊列頭部取出第一個線程去獲取鎖,獲取鎖的線程會被移出隊列;

1.7 結點進入同步隊列時會進行的操作

enq方法會在addWaiter()方法中得到調用,是屬於AbstractQueuedSynchronizer類的方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

//結點入隊操作, 返回前一個結點  private Node enq(final Node node) {      for (;;) {          //獲取同步隊列尾結點引用          Node t = tail;          //如果尾結點為空說明同步隊列還沒有初始化          if (t == null) {              //初始化同步隊列,並在之後會進入另一個自鎖循環              if (compareAndSetHead(new Node())) {                  tail = head;              }          } else {              //1.指向當前尾結點              node.prev = t;              //2.設置當前結點為尾結點              if (compareAndSetTail(t, node)) {                  //3.將舊的尾結點的後繼指向新的尾結點                  t.next = node;                  //for循環唯一的出口                  return t;//注意返回的是之前的尾結點,相當於當前尾結點上一個節點              }          }      }  }

static final int SIGNAL = -1; //表示後繼結點的線程需要運行

 讀者需要注意添加尾結點的順序,分為三步:

  1. 指向尾結點
    1. 使用CAS方法更改尾結點(CAS為一種並發的策略),t作為一個指向尾結點的引用變量,當其他線程將AQS的尾部域tai改變的時候就會導致t不在指向當前AQS的尾部引用,所以按照CAS操作特性,那麼就會使其進入下一個自旋(並且會更改所有涉及t的賦值語句,包括CAS語句之前的,比如:node.prev = t;)。
  2. 將舊尾結點的後繼指向當前結點。 注意事項: 在並發環境中這三步操作不一定能保證完成,所以在清空同步隊列所有已取消的結點這一操作中,為了尋找非取消狀態的結點,不是從前向後遍歷而是從後向前遍歷的。還有就是每個結點進入隊列中時它的等待狀態是為0,只有後繼結點的線程需要掛起時才會將前面結點的等待狀態改為SIGNAL。

1.8 AQS 模型總結

 很多初學者可能對於AQS的用途不是非常了解,甚至認為其是由一個專門進行AQS對象管理的線程所管理的,其內部節點的增刪、節點屬性的修改都統統由這個線程管理,而其餘線程只需要和這個線程進行交互,告訴其進行何種操作。但實際上AQS對象並非由一個線程單獨進行管理,而是:一個AQS對象可以被多個線程同時訪問,但是此線程的引用一般也儲存於AQS對象中的鏈表數據結構中的節點中,否則AQS不負責這個線程的執行、休眠、和鎖資源的分配。當然AQS對象中的所有節點(不管此節點所存線程對象引用是否為當前訪問AQS的線程),所有的線程都能夠進行訪問到,但是有些AQS方法在當前訪問AQS對象的線程與被訪問的節點中的線程引用不相同則會報錯。此模型的示意圖如下:

 以線程1對象為例,線程1對象引用有存放於AQS對象中,再讓線程1的run方法中調用訪問AQS對象域或者方法,可以分為以下兩種情況:

  1. 訪問的是節點1,節點中存儲的線程引用就等於當前線程,那麼此時相關操作的權限就比較大 node.thread == Thread.getCurrentThread()
  2. 訪問的是其他節點,節點中存儲的線程引用不等於當前線程,那麼此時相關操作的權限就比較小 node.thread != Thread.getCurrentThread(),沒有將其引用放置於AQS引用的線程訪問AQS對象,也屬於此類情況。

 下面舉一個相關方法執行權限的大小例子,

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease,此方法用於釋放鎖:

protected final boolean tryRelease(int releases) {              int c = getState() - releases;              if (Thread.currentThread() != getExclusiveOwnerThread())                  throw new IllegalMonitorStateException();              boolean free = false;              if (c == 0) {                  free = true;                  setExclusiveOwnerThread(null);              }              setState(c);              return free;          }
            if (Thread.currentThread() != getExclusiveOwnerThread())                  throw new IllegalMonitorStateException();

 上面是tryRelease方法首先判斷的邏輯:是否當前線程不等於鎖的擁有者線程,如果不是,那麼直接就拋出異常,不再向下運行了。這就是權限不夠的一個例子,在ReentrantLock類中想讓一個鎖成功釋放,只能通過佔用鎖的線程來調用tryRelease()方法,此方法對權限的要求十分嚴苛。當然並不是所有方法都有關於訪問權限的設計,具體情況還是要根據相關方法的源代碼來進行判斷。

 在分析相關AQS抽象類內部實現邏輯時,要要把節點和線程看作同一件物品。就像並發中使用object.wait()方法時,由於wait方法必須在synchronized(object){...}代碼塊中,所以導致了在一個時刻中一個對象只能對應一個線程,所以簡單的object.wait()調用表明上沒有線程參與,但是卻使當前佔用對象鎖的線程釋放了鎖資源。AQS中的節點也是一樣,比如final Node node = addWaiter(Node.SHARED);,以此方式將一個新節點加入舊等隊列中,表面上看新節點中連線程引用都沒規定,但是隱含的是調用此方法時的線程就是此新節點的線程引用對象,目的就是將當前線程設計為共享模式下的搶佔資源。object.wait()方法的實現由上鎖機制實現,而後者雖然沒有上鎖,但是從邏輯上保證了這一一對應的特性。

二、獨佔模式源碼分析

2.1 引子

 在上一章中我們介紹了AbstractQueuedSynchronizer基本的一些概念,主要講了AQS的排隊區是怎樣實現的,什麼是獨佔模式和共享模式以及如何理解結點的等待狀態。

 我們要理清楚一個概念,線程的存在不意味着佔用大量的資源,開始運行時,才佔用CPU、內存等資源,否則線程/節點存在着,競爭着資源的過程只是一個輕量級的資源消耗(鎖消耗的資源:自旋等待>休眠)。獨佔模式中,線程搶到鎖以及沒搶到鎖,其本質上的區別在於能否有權限執行自己的代碼了,而線程/節點除非被移除鏈表,否則一直是存在着的。

 在本篇中會介紹在獨佔模式下結點是怎樣進入同步隊列排隊的,以及離開同步隊列之前會進行哪些操作。

AQS為獨佔模式和共享模式下的獲取鎖分別提供三種獲取方式:

  1. 不響應線程中斷獲取
  2. 響應線程中斷獲取
  3. 設置超時時間獲取;

2.1.1 AQS中引入CAS

 後續我們會看到在AQS中廣泛且大量地使用了CAS機制,這樣設計的原因是作為一個等待序列是唯一的,但是在同一時刻可能會有多個節點需要加入至隊尾,或者同時又多個節點被修改為取消狀態,需要被移除鏈表。使用CAS就能以良好地並發特性來解決這些並發問題。

2.2 以不響應線程中斷獲取鎖的實現邏輯,即acquire方法

public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();  }

 「不響應線程中斷」的含義是:獲取鎖的整個過程是忽略線程中斷的。其具體含義就是說,無論是acquire自己調用selfInterrupt方法引起的中斷,還是其餘線程致使當前節點中的線程中斷,此方法對中斷沒有採取任何響應,只是簡單地將當前中斷標誌位設置為true。比如:java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法中的語句只有try-finally結構,沒有catch,所以其甚至不會在控制台上打印出當前線程被中斷了的提示。

此方法是獨佔模式下線程獲取共享資源的頂層入口。

  • 如果獲取到鎖資源,線程直接返回。
  • 否則進入等待隊列,直到獲取到資源為止,且整個過程 忽略中斷 的影響。
  • 注意鎖資源的得到不是意味着返回一個鎖對象,實際上鎖對象的引用地址是很容易拿到的,只是線程(節點)拿到鎖對象後,該線程獲得了執行的機會,而其他線程在獨佔模式中只能等待其執行完畢。

方法內部執行邏輯圖:

由上圖可以看到方法執行的流程邏輯,下面用文字來說明:

  1. tryAcquire() 嘗試直接獲取資源,成功則 state 值被修改並返回 true,線程繼續執行,在acquireQueued() 方法中有被調用到。
  2. 獲取資源失敗,執行 addWaiter() 將包含當前線程的節點加入等待隊列的尾部,並標記為獨佔模式。
  3. acquireQueued() 使線程在等待隊列中獲取資源,一直獲取到資源後才返回(資源的獲得必須是頭節點中的線程釋放鎖,且當前線程為頭節點之後的節點,當前線程才能在後續中得到鎖)。
  4. 如果線程在等待過程中被中斷過,它是不響應的。 只是獲取資源後才再進行自我中斷 selfInterrupt(),將中斷補上。 其中判定退出隊列的條件是否滿足和休眠當前線程完成了 自旋 的過程。 方法名 作用 返回值 tryAcquire() 試圖獲得唯一的鎖資源 boolean值 addWaiter() 在當前等待隊列有無隊尾的情況下都能將當前節點加入至隊列中成為新隊尾 返回新隊尾節點 acquireQueued() 用一個自旋,始終等待鎖的釋放以及當前線程/節點運行到頭節點後一個節點的隊列次序。 boolean值

方法源代碼詳細學習:

2.2.1.tryAcquirejava.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire

protected boolean tryAcquire(int arg) {       throw new UnsupportedOperationException();  }

方法所實現的功能:

  • 嘗試去獲取獨佔鎖資源。如果獲取成功,則直接返回 true,否則直接返回 false。
  • AQS 只定義了一個接口,具體資源的獲取交由其子類去實現(通過 state 的 get / set / CAS)

注意一個設計模式上的細節:

 該方法需要子類來實現,但是卻沒有使用 abstract 來修飾。是因為 AQS 有獨佔和共享兩種模式,而子類可以只實現其中一種功能,如果使用 abstract來修飾,每個子類都需要同時實現兩種功能的方法,對子類不太友好。並且子類一般就是針對於某一個模式下的AQS實現,所以也不必將兩種模式的方法都進行重寫。

2.2.2 addWaiter(Node),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

private Node addWaiter(Node mode) {          //以給定模式構造結點。mode 有兩種:EXCLUSIVE(獨佔)和 SHARED(共享)          Node node = new Node(Thread.currentThread(), mode);          //嘗試快速方式直接放到隊尾。          Node pred = tail;          if (pred != null) {              node.prev = pred;              if (compareAndSetTail(pred, node)) {//CAS方法                  pred.next = node;                  return node;              }          }          //上一步失敗則通過 enq 入隊。失敗的原因有多種,比如目前隊尾tail節點為空,這是需要enq方法的主要原因,          //因enq方法能夠在隊尾為空的情況下,進行初始化,將新的節點作為頭節點加入,          //而快速方法只能提供有尾結點存在的情況下,加入線程;          enq(node);          return node;  }  ......  private Node enq(final Node node) {          //CAS " 自旋 ",直到成功加入隊尾。          for (;;) {              Node t = tail;              if (t == null) { // Must initialize //隊列為空,創建一個空的標誌結點作為 head 結點,並將 tail 也指向它。                  if (compareAndSetHead(new Node()))                      tail = head;              } else { //正常流程,放入隊尾(由於CAS機制的性質,所以需要額外加入這個邏輯判斷)                  node.prev = t;                  if (compareAndSetTail(t, node)) {                      t.next = node;                      return t;                  }              }          }  }

上述方法中的節點都涉及了兩個方向的指向,所以顯然這是關於同步隊列增加新節點的方法。

自旋特性的體現

  1. 上面的for(;;)是一般配合CAS操作鎖採用的自旋,之所以在addWaiter方法中先採用快速加入隊尾方法後還在自旋鎖中進行放入尾部操作(畢竟enq方法的調用就是為了處理沒有隊尾的情況)的原因是CAS處理中可能有多個線程同時發現沒有隊尾,我們對第一個進行了沒有隊尾時的節點加入操作,調用了enq方法,但是同樣調用了CAS方法的後續線程所面對的可是有隊尾的鏈表結構,這時只需要首先快速入隊操作就可以了,如果此時在發現CAS方法返回false,那麼在調用enq方法進入自旋也不遲。核心思想還是首要確保線程安全性,其次確保執行效率得到提高。
  2. 自旋也是CAS策略的一部分,之所以使用無條件for循環,原因就是在CAS機制中如果同一時刻有多個線程想被插入為當前隊列的隊尾,那麼只能有一個線程設置隊尾成功,其餘線程都得失敗,沒有執行上面代碼中的return t;語句,for語句的設定,就能保證,總有一次當前線程/節點能夠加入到隊列中成為隊尾。

方法實現的功能:

 用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。注意,雖然enq(node)返回的是當前尾結點的前一個節點,但是整個addWaiter方法返回的是當前尾結點,即新加入至尾部的新節點。

  • 生成新 Node 結點 node,如果 tail 結點不為空,則通過 CAS 指令插入到等待隊列的隊尾(同一時刻可能會有多個 Node 結點插入到等待隊列中),並建立前後引用關係。
  • 如果 tail 結點為空,則將 head 結點指向一個空結點

2.2.3 acquireQueued(Node, int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued

此方法的jdk源代碼:

final boolean acquireQueued(final Node node, int arg) {          boolean failed = true; //標記是否成功拿到資源,true意味着失敗了          try {              boolean interrupted = false; //標記等待過程中是否被中斷過              //自旋              for (;;) {                  final Node p = node.predecessor(); //獲得此節點上一個節點,若為空,則直接拋出異常。                  if (p == head && tryAcquire(arg)) {//如果前驅結點 p 是 head,即該結點 node 為第二結點,那麼便有資格去嘗試獲取資源(可能是 p 釋放完資源後喚醒,也可能被 interrupt)。                      setHead(node); //獲取資源後,將 head 指向 node。                      p.next = null; // setHead 中 node.prev 已置為 null,此處再將 p.next 置為 null,就是為了方便 GC 回收以前的 head 結點 p,也就意味着之前拿完資源的結點 p 出隊。                      failed = false;                      return interrupted; //返回等待過程中是否被中斷過                  }                  //如果自己可以休息了,就進入 waiting 狀態,直到被 unpark()。                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt()) //如果等待過程中被中斷過,哪怕只有那麼一次,就將 interrupted 標記為 true。                      interrupted = true;              }          } finally {              if (failed)                  cancelAcquire(node);          }  }

源代碼邏輯分析:

通過 tryAcquire()addWaiter(),該線程獲取資源失敗,被放入等待隊列尾部。

  • node 插入到隊尾後,該線程不會立馬掛起,會進行自旋 操作(一直執行如下邏輯過程):
    • 判斷該 node 的前一個結點 pred 是否為 head 結點。
    • 如果是,則表明當前結點是隊列中第一個有效結點,有最優先的執行權力,只等得到鎖的資源了,即判斷下面語句是否為真:
    • 根據tryAcquire() 返回的布爾值判斷當前線程是否獲取到了鎖(具體方法有子類實現提供)。
  • 如果成功獲取到鎖,線程 node 無需掛起。
    • 如果獲取鎖失敗,表示前驅線程還未完成,至少還未修改 state 的值,那麼就需要繼續等待。
  • 如果判斷node前一個節點不是head節點,那麼就直接退出當前的邏輯
  • 調用 shouldParkAfterFailedAcquire(),結點進入隊尾後,檢查狀態,找到安全休息點。
  • 調用 parkAndCheckInterrupt() 使當前i線程(節點)進入 waiting 狀態,等待 unpark() 或 interrupt() 喚醒。
    1. 被喚醒後,看是否有資格獲取資源。如果獲得資源,head 指向當前結點,並返回從入隊到獲得資源的整個過程中是否被中斷過。
    2. 如果未獲取資源,則重新調用 shouldParkAfterFailedAcquire()

2.2.4 shouldParkAfterFailedAcquire(Node, Node),即:java.util.concurrent.shouldParkAfterFailedAcquire

jdk源碼閱讀:

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {            int ws = pred.waitStatus;  //獲得前驅結點狀態。            if (ws == Node.SIGNAL)                //如果前驅結點狀態為等待觸發,則進入安全休息點。這是因為前面的節點執行完畢,就會主動地觸發下一個節點開始線程執行,                //所以可以告知調用者這次鎖雖然獲取失敗了,但是可以休息了,但是具體如何休息的方法不歸這個方法管。                //所以這個方法通常和具體執行線程等待代碼的語句通過&&邏輯相結合使用                return true;            if (ws > 0) {//ws大於0,意味着前面一個節點的狀態為取消狀態。                //如果前驅為取消狀態,就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後面。                //那些取消狀態的結點,由於被當前結點 " 加塞 " 到它們前邊,它們相當於形成一個無引用鏈,稍後會被 GC 回收。                do {                    node.prev = pred = pred.prev;                } while (pred.waitStatus > 0);                pred.next = node;//這裡只改變了節點的指向,但是沒有改變節點的順序。            } else {                //如果前驅結點正常,將前驅狀態設置成 SIGNAL 等待觸發(因為前驅正常狀態有三種,不一定為SINGAL),                //此方法使用CAS方式來進行。             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);            }            return false;           //代表這一次沒符合要求,即前驅狀態不為SINNGAL,但因為方法在acquireQueued方法內的自旋中,           //所以下一次調用這個方法時,就直接返回true了。    }

方法功能:用於檢查狀態,看是否真的可以進入 waiting 狀態。

  1. 如果 pred 的 waitStatus 為 SIGNAL,則通過 parkAndCheckInterrupt() 方法把當前線程掛起,並等待被喚醒。
  2. 如果 pred 的 waitStatus > 0,表明 pred 的線程狀態 CANCELLED,需從隊列中刪除(查看1.6中對於節點等待狀態的取值說明就能理解其中含義)。並將當前節點前最近的非取消狀態的節點設置為當前節點的前節點。
  3. 如果 pred 的 waitStatus == 0,則通過 CAS 指令修改 waitStatus 為 SIGNAL。(每個結點的 SIGNAL 狀態都是被後一個結點設置的,因為本身其目的就是提示前面節點執行完畢後喚醒後面節點,為後面節點服務的)。

2.2.5 parkAndCheckInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {          LockSupport.park(this); //調用 park() 使線程進入 waiting 狀態。          return Thread.interrupted(); //如果被喚醒,查看自己是不是被中斷的。true,意味着線程被中斷了。  }

目的:讓線程真正進入等待狀態。

park() 會讓當前線程進入 waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:unpark()interrupt()。而前一個線程喚醒後一個線程此功能需要到線程的執行代碼處實現。

需要注意 Thread.interrupted()會 清除當前線程的中斷標記位。

 線程每次被喚醒時,都要進行中斷檢測,如果發現當前線程被中斷,則返回true;

 從自旋的整個過程可以看出,並不是被喚醒的線程一定能獲得鎖,必須調用 tryAccquire()重新競爭,因為鎖可能是非公平的(子類實現),有可能被新加入的線程獲得,從而導致剛被喚醒的線程再次被阻塞。 如果已經在隊列中的線程,必須按照順序執行(等待前驅結點的相關操作,這是 公平的),非公平是針對那種還沒進隊列的線程可以和隊列中的第一個結點 head 搶佔資源。

2.2.6 selfInterrupt(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#selfInterrupt

jdk源代碼:

    static void selfInterrupt() {          Thread.currentThread().interrupt();      }

根據 acquireQueued() 的結果決定是否執行中斷。

acquireQueued() 中的 方法已經執行了中斷,這裡再執行一次中斷的目的在於:

由於上面整個線程一直是掛在for循環的parkAndCheckInterrupt()方法裡頭,沒有成功獲取到鎖之前不響應任何形式的線程中斷,只有當線程成功獲取到鎖並從for循環出來後,他才會查看在這期間是否有人要求中斷線程,如果是的話再去調用selfInterrupt()方法將自己掛起。

2.2.7 cancelAcquire(Node)即,java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire方法

jdk源代碼:

private void cancelAcquire(Node node) {          // Ignore if node doesn't exist          if (node == null)              return;          //設置該結點不再關聯任何線程。          node.thread = null;            // 通過前繼結點跳過取消狀態的 node。          Node pred = node.prev;          while (pred.waitStatus > 0)//這裡是因為前繼節點但也可能是處於取消狀態,              node.prev = pred = pred.prev;            // 獲取過濾後的前繼結點的後繼結點。          Node predNext = pred.next;          // 設置狀態為取消狀態。          node.waitStatus = Node.CANCELLED;            // 1.如果當前結點是 tail,嘗試更新 tail 結點,設置 tail 為 pred,更新失敗則返回,成功則設置 tail 的後繼結點為 null。          if (node == tail && compareAndSetTail(node, pred)) {              compareAndSetNext(pred, predNext, null);          } else {              // 2.如果當前結點不是 head 的後繼結點,判斷當前結點的前繼結點的狀態是否為 SIGNAL,如果不是則嘗試設置前繼結點的狀態為 SIGNAL。              int ws;              if (pred != head &&                  ((ws = pred.waitStatus) == Node.SIGNAL ||                   (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&                  pred.thread != null) {                  Node next = node.next;                  if (next != null && next.waitStatus <= 0)                      compareAndSetNext(pred, predNext, next);              } else {                  // 3.如果是 head 的後繼結點或者狀態判斷設置失敗,則喚醒當前結點的後繼結點。                  unparkSuccessor(node);//unparkSuccessor()中並沒有對隊列做任何調整              }              node.next = node; // help GC          }  }

上述代碼的邏輯提體現了三種在取消節點過中遇到的數據結構(特別是邊界值的處理):

結點取消分三種情況:(分類很奇怪,因為不是一頭、一尾、一中間),且對應上述代碼行1,2,3

  1. 當前結點是 tail

因為 tail 是隊列的最後一個結點,如果該結點需要取消,則直接把該結點的前繼結點的 next 指向 null,也就是把當前結點移除隊列。

下圖是第一種情況的示意圖:

2.當前結點不是 head 的後繼結點,也不是 tail。

將 node 的前繼結點的 next 指向了 node 的後繼結點。

3.當前節點是 head 的後繼結點。

unpark 後繼結點的線程,然後將 next 指向了自己。

對head的理解  從setHead()的實現以及所有調用的地方可以看出,在獨佔模式中,head指向的節點必定是拿到鎖(或是競爭資源)的節點,而head的後繼節點則是有資格爭奪鎖的節點,再後續的節點,就是阻塞着的了。head指向的節點,其關聯的線程必定已經獲取到資源,在執行了,所以head無需再關聯到該線程了。head所指向的節點,也無需再參與任何的競爭操作了。  現在再來看node出隊時的分類,就好理解了。head既然不會參與任何資源競爭了,自然也就和cancelAquire()無關了。因為head節點必然不是處於取消狀態的節點。

2.2.8 方法實現的幾個細節

一個重要的疑問:

 既然要刪除結點,為什麼沒有對 prev 進行操作,僅僅是修改了 next。從數據結構上看,雙向鏈表的結構完全沒有得到保證?那為何是這般設計?

 因為修改指針的操作都是 CAS 操作,在 AQS 中所有以 compareAndSet 開頭的方法都是嘗試更新,並不保證成功。  不能用 CAS 操作更新 prev,因為 prev 是不確定的,更新失敗有可能會導致整個隊列的不完整,例如把 prev 指向一個已經移除隊列的 node。另外一個重要原因是(這是我的思路,並且我感覺更為正確),因為cancelAcquire方法中的CAS操作只是用於對於一個節點的設置為取消狀態,利用CAS保證其線程安全性是可以做到的,但是shouldParkAfterFailedAcquire方法要前向遍歷多個節點,所以這裡採用CAS操作就不合適了。

do {      node.prev = pred = pred.prev;  } while (pred.waitStatus > 0);  pred.next = node;

shouldParkAfterFailedAcquire方法不使用CAS機制的原因之二:

 因為當前節點在自旋過程中一定被設置為非取消狀態,所以這樣一來,其餘線程在自旋中調用 shouldParkAfterFailedAcquire()方法時,至少會在當前節點時停止遍歷。所以當前節點之前的節點不會因為 其他線程的shouldParkAfterFailedAcquire()方法調用而改變相關結構,那麼也就意味着當前線程所對應的節點對象node.prev屬性只會因為自己的do-while遍歷而改變,但是其餘線程不會使其改變。這就帶來了線程安全性。所以 shouldParkAfterFailedAcquire()方法不使用CAS機制也是線程安全的方法。線程是否安全歸根揭底是當前線程所訪問/寫的域有無機會被其他線程修改,如果沒有那麼就是線程安全的。

2.3 release(int)(線程釋放鎖的過程),即java.util.concurrent.locks.AbstractQueuedSynchronizer#release

jdk源碼:

public final boolean release(int arg) {          if (tryRelease(arg)) {              Node h = head; //獲得頭結點。              if (h != null && h.waitStatus != 0)                  unparkSuccessor(h); //喚醒等待隊列里的下一個線程。              return true;          }          return false;  }

 此方法是獨佔模式(exclusive mode)下線程釋放共享資源的頂層入口。  釋放指定量資源,如果徹底釋放(即 state = 0,tryRelease(arg)方法返回true),然後喚醒等待隊列里的其他線程來獲取資源。  調用 tryRelease()釋放資源。根據返回值判斷該線程是否已經完成釋放資源,自定義同步器在設計時需明確這一點。

2.3.1 tryRelease(int),即java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease

jdk源碼:

protected boolean tryRelease(int arg) {       throw new UnsupportedOperationException();  }

功能:嘗試去釋放指定量的資源。

 跟tryAcquire()一樣,該方法需要獨佔模式的AQS子類實現,因為AQS類此方法只能拋出異常,沒有返回值。

 正常情況下,tryRelease()都會成功的,因為是獨佔模式,該線程釋放資源,那麼它肯定已經獲得獨佔資源,直接減掉相應量的資源即可(state – = arg),也不需要考慮線程安全問題。  返回值:如果已經徹底釋放資源(state = 0),返回 true,否則返回 false。

2.3.2 unparkSuccessor(Node),即java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

jdk源碼:

private void unparkSuccessor(Node node) {          //node 為當前線程所在結點。          int ws = node.waitStatus;          if (ws < 0) //如果當前的狀態為非取消狀態,那麼將其狀態置零              compareAndSetWaitStatus(node, ws, 0);            Node s = node.next; //找到下一個需要喚醒的結點 s。          if (s == null || s.waitStatus > 0) { //如果為null或取消狀態,那麼執行下面語句              s = null;              for (Node t = tail; t != null && t != node; t = t.prev)                  if (t.waitStatus <= 0) // <=0 的結點,都是有效結點。                      s = t;          }          if (s != null)              LockSupport.unpark(s.thread); //喚醒  }

 功能:用於喚醒等待隊列中下一個線程,此線程為輸入節點參數的下一個節點。  執行流程:

  1. 如果輸入節點的 waitStatus 值為非取消狀態(>0),則用 CAS 指令重置為 0,而一般喚醒操作是head節點中的線程將要執行完畢時進行的,所以一般輸入節點為當前等待隊列的head節點。所以下面就將輸入節點等價稱呼為頭節點。
  2. 尾部向前遍歷找到離尾部最近且其waitStatus域小於0的節點,至於為什麼從尾部開始向前遍歷,通過LockSupport.unpark(s.thread)喚醒線程。

 注意事項:

 可以看到頭節點的更改在釋放鎖,並喚醒下一個線程的過程中沒有進行更改,但是為了當前搶佔到鎖的節點為頭節點以保證下一個tryRelease方法的正確執行,頭節點必須更新,但是這裡為何沒有此操作?答案是:不需要。因為我們只是在這裡釋放鎖,且喚醒下一個節點。但是被喚醒的節點並不一定有機會能搶佔到鎖,就像我在2.2.5小節中所說的那樣,被喚醒的節點可能沒能搶過新加入AQS對象的節點。就是因為被喚醒節點的鎖搶佔的不確定性,所以head的更新應當放置於tryAcqure()方法中進行實現。所以總結一點:節點被喚醒不等於節點搶佔到資源。

2.4 獨佔模式AQS子類的具體代碼案例

 在上述對AQS類分析的過程中,提到了tryAcquire,即嘗試獲得線程鎖的方法必須得由繼承於AQS的子類實現,其為獲得鎖最關鍵的實現,所以我們有必要對其做一個舉例,看看具體的子類是如何實現此方法的。

 以下方法選取自java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire.而ReentrantLock又是AQS類在獨佔模式下的一個實現。

protected final boolean tryAcquire(int acquires) {      final Thread current = Thread.currentThread();      int c = getState();      if (c == 0) {// 初次獲取鎖          if (!hasQueuedPredecessors() &&              compareAndSetState(0, acquires)) {              setExclusiveOwnerThread(current);              return true;          }      }// 重入獲取鎖      else if (current == getExclusiveOwnerThread()) {      	int nextc = c + acquires;// 重入時直接將狀態值+1          if (nextc < 0)              throw new Error("Maximum lock count exceeded");          setState(nextc);          return true;          }      return false;  }

 其中最為關鍵的代碼就是setExclusiveOwnerThread(current);方法,其將當前線程設置為獨佔鎖的線程。

三、共享模式

3.1 acquireShared(int)(線程獲取鎖的過程),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquieShared

 其獲取鎖的過程比獨佔模式的獲取鎖方法acquire()方法多了一個Shared後綴。

 jdk源碼:

public final void acquireShared(int arg) {          if (tryAcquireShared(arg) < 0)              doAcquireShared(arg);//如果獲取資源失敗,通過此方法進如等待序列,繼續嘗試資源獲取  }

 獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。

acquireShared() 的流程:(其還是一個嘗試獲取資源的方法)

  1. 成功得到資源,則方法調用直接結束。
  2. 獲取資源失敗則通過 doAcquireShared() 進入等待隊列,直到獲取到資源為止才返回。

 這裡對比一下獨佔鎖的acquire方法的源代碼:

public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();  }

 可見共享或者獨佔的acquire() 的實現邏輯十分相似。

3.1.1 tryAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared

jkd源代碼:

    protected int tryAcquireShared(int arg) {          throw new UnsupportedOperationException();      }

由此可見,共享模式下獲取鎖的方法也是必須由子類進行具體實現,不過區別是獨佔模式中acquire()方法返回布爾值,代表獲得成功或失敗;而在共享模式中,acquireShared()方法返回的是int類型值,並且有一下三種取值方式:

  1. 負數 代表獲取失敗。
  2. 0 代表獲取成功,但沒有剩餘資源,其它線程不能夠繼續獲取
  3. 正數 代表獲取成功,還有剩餘資源,其他線程還可以去獲取

不過如何去判斷資源是否有剩餘,需要由子類繼承AQS類後對本方法進行重寫,決定其邏輯。

3.1.2 doAcquireShared(int)方法,即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared

jdk源碼:

private void doAcquireShared(int arg) {          final Node node = addWaiter(Node.SHARED); //加入隊列尾部,SHARED 模式。          boolean failed = true; //是否成功標誌          try {              boolean interrupted = false; //等待過程中是否被中斷過的標誌              for (;;) {                  final Node p = node.predecessor(); //獲得前驅結點                  if (p == head) { //如果結點為 head 結點的下一個,因為 head 是拿到資源的線程,此時 node 被喚醒,很可能是 head 用完資源來喚醒自己的。                      int r = tryAcquireShared(arg); //嘗試獲取資源                      if (r >= 0) { //獲取資源成功                          setHeadAndPropagate(node, r); //將 head 指向自己,如果還有剩餘資源可以再喚醒之後的線程。                          p.next = null; // help GC                          if (interrupted) //如果等待過程中被打斷過,此時將中斷補上。                              selfInterrupt();                          failed = false;                          return;                      }                  }                  //判斷狀態,尋找安全點,進入 waiting 狀態,等着被 unpark() 或 interrupt()。                  if (shouldParkAfterFailedAcquire(p, node) &&                      parkAndCheckInterrupt())                      interrupted = true;              }          } finally {              if (failed)                  cancelAcquire(node);          }  }

 此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。

 此方法跟獨佔模式的acquireQueued比,很多處理是十分相似的:

 比如addWaiter()shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt()方法等調用幾乎是沒區別的,以及都是只有當前結點 node 的前驅結點是 head 時,才會去嘗試獲取資源。資源有剩餘的情況再去喚醒之後的結點。

​ 又有一些不同之處:

  1. 共享模式下方法的調用呈現更加的集成化,比如addWaiter()方法直接放置於doAcquireShared方法內部,而獨佔模式是作為參數放置於方法體參數中;
  2. 共享模式多了一個資源數的判斷,允許了多個線程可以同時運行,即:
    1. 獨佔模式下的 tryRelease() 需要在完全釋放掉資源(state = 0)後,才會返回 true 去喚醒其他線程,主要是基於獨佔下可重入的考量。
    2. 共享模式下的 releaseShared() 沒有這種要求,共享模式實質就是控制一定量的線程並發執行,只要擁有資源的線程在釋放掉部分資源後就可以喚醒後繼等待結點。

3.1.3 setHeadAndPropagate(Node, int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate

jdk源代碼:

private void setHeadAndPropagate(Node node, int propagate) {          Node h = head; //獲得 head 結點          setHead(node); //head 指向當前結點          // 如果還有剩餘量,繼續喚醒下一個結點          if (propagate > 0 || h == null || h.waitStatus < 0 ||              (h = head) == null || h.waitStatus < 0) {              Node s = node.next;              if (s == null || s.isShared())                  doReleaseShared();//這裡用於後續節點的喚醒操作          }  }

此方法比在獨佔模式中調用的setHead()方法的多了一步,即:當前節點蘇醒的同時,如果條件符合(還有剩餘資源),還會去喚醒後繼結點,而喚醒操作是在下面3.1.6介紹的方法:doReleaseShared();

3.2 releaseShared()(線程釋放鎖的過程),即:

java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared

jdk源代碼:

public final boolean releaseShared(int arg) {          if (tryReleaseShared(arg)) {              doReleaseShared();//這裡用於後續節點的喚醒操作              return true;          }          return false;  }

 此方法是共享模式下線程釋放共享資源的頂層入口。

 會釋放指定量(int arg)的資源,如果成功釋放且允許喚醒等待線程,會喚醒等待隊列里的其他線程來獲取資源。

 例如,假設一種情況:有三個具有不同線程數的進程需要執行,分別消耗的資源總量是:A(5)和 B(7)、C(4);

一共的資源總數是13,先使A、B先進行進程的運行,然後C進程也爭奪資源了,但是C發現資源只有1,不夠自己所需的數目:4,所以C進程過來後進行了等待。接着,A 在運行過程中釋放掉 2 個資源量,然後 tryReleaseShared(2) 返回 true 喚醒 C,C只有 3 個資源量仍不夠,繼續等待;隨後 B 又釋放 2 個資源量,tryReleaseShared(2) 返回 true 喚醒 C,C 發現資源量 5 個足夠自己使用,然後 C 就可以跟 A 和 B 一起運行。  ReentrantReadWriteLock 讀鎖的 tryReleaseShared() 只有在完全釋放掉資源(state = 0)才返回 true,所以自定義同步器可以根據需要決定 tryReleaseShared() 的返回值,就如下面所示:

3.2.1 tryReleaseShared(int),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared

jdk源碼:

 protected boolean tryReleaseShared(int arg) {          throw new UnsupportedOperationException();      }

此方法類似於獨佔模式下鎖的釋放方法,即:tryRelease(int);

3.2.2 doReleaseShared(),即:java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared

jdk源代碼:

     // 會喚醒等待共享鎖的線程      private void doReleaseShared() {          for (;;) {              // 將同步隊列頭賦值給節點h              Node h = head;              // 如果節點h不為null,且不等於同步隊列尾              if (h != null && h != tail) {                  // 得到節點h的狀態                  int ws = h.waitStatus;                  // 如果狀態是Node.SIGNAL,就要喚醒節點h後繼節點的線程                  if (ws == Node.SIGNAL) {                      // 將節點h的狀態設置成0,如果設置失敗,就繼續循環,再試一次。                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                          continue;            // loop to recheck cases                      // 喚醒節點h後繼節點的線程                      unparkSuccessor(h);                  }                  // 如果節點h的狀態是0,就設置ws的狀態是PROPAGATE。                  else if (ws == 0 &&                           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                      continue;                // loop on failed CAS              }              // 如果同步隊列頭head節點發生改變,繼續循環,              // 如果沒有改變,就跳出循環              if (h == head)                  break;          }      }

 根據當前隊列頭節點的狀態,在喚醒下一個節點之後(調用了unparkSuccessor(h);方法)對頭節點狀態做出相關改變。

四、獨佔模式與共享模式中AQS方法調用邏輯總結

4.1 AQS獨佔模式方法分析

 獨佔模式下獲得鎖的方法

方法名

作用/地位

acquire(int arg)

獨佔模式中的使線程獲得鎖的最頂層方法

tryAcquire(int arg)

嘗試獲得鎖,這是線程獲得鎖的關鍵步驟與核心方法,且必須得被相關子類方法重寫實現。

acquireQueued(Node, int)

此方法用於在節點/線程沒有搶佔到鎖資源時,拉入隊列,並確保它能夠在將來某個時刻能夠搶佔到資源;方法的返回值是當前線程有無被中斷過的布爾值。

addWaiter(Node)

其功能是將當前線程節點存入隊列且置於隊尾,方法的返回值是新隊尾。返回值作為參數傳入acquireQueued方法中,是acquireQueued方法將節點放入隊列的實際實現

shouldParkAfterFailedAcquire(Node pred, Node node)

返回是否可以將當前節點掛起的布爾值,其判決條件是當前節點的前面所有節點是否有不處於取消狀態的節點,有則會將前面節點狀態設置為SIGNAL,返回true

parkAndCheckInterrupt()

其在判決當前節點可以掛起後進行線程掛起操作(即只有在上一個方法返回true時,此方法才會被調用),如果線程可以通過被喚醒、中斷而停止掛起,其返回值是是否被中斷的布爾值。

cancelAcquire(Node)

是當前線程節點狀態設置為取消狀態,並且把當前取消狀態的線程節點從等待序中移出

unparkSuccessor(Node)

用於喚醒當前線程節點的下一個節點,獨佔模式下,頭節點將鎖資源主動或被動地釋放後,就會將其下一個節點喚醒,讓其進入自旋,進行資源的搶佔(搶佔不一定成功)。

selfInterrupt( )

當前線程獲得資源後的補充中斷

 下圖則是獨佔模式下一個節點能走的最長流程,並非所有節點都能夠走完這個流程(比如說不是所有加入的節點會被設置為取消狀態,即有調用cancelAcquire(Node)方法),注意流程圖的箭頭指向主要是指時間順序,一個方法位於另一個方法中則說明其被另一個方法調用了。

 獨佔模式下釋放鎖的相關方法:

方法名

作用/地位

release(int)

此方法是獨佔模式(exclusive mode)下線程釋放共享資源的頂層入口

tryRelease(int)

嘗試去釋放指定量的資源

unparkSuccessor(Node node)

用於喚醒當前線程節點的下一個節點。由於獨佔模式下佔得資源的只能是head節點,所以將頭節點所佔資源釋放後才進行緊挨着頭節點的節點喚醒工作。

[外鏈圖片轉存失敗(img-ZBIvf1iW-1569126340372)(%E7%BB%98%E5%9B%BE2.png)]

4.2 AQS共享模式方法分析

 共享模式下獲得鎖的相關方法

方法名

作用/地位

acquireShared(int)

此方法是共享模式下獲取資源的最頂層方法

tryAcquireShared(int arg)

此方法是共享模式下獲得鎖資源的核心方法,其必須得由AQS子類方法重寫。

doAcquireShared(int arg)

此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,使當前線程在將來某個時刻得到資源。

setHeadAndPropagate(Node node, int propagate)

首先將節點蘇醒,其次,如果條件符合(還有剩餘資源),還會去喚醒後繼結點

addWaiter(Node)

其功能實現和獨佔模式下同名方法是一樣的

selfInterrupt()

其功能實現和獨佔模式下同名方法是一樣的

shouldParkAfterFailedAcquire(Node pred, Node node)

其功能實現和獨佔模式下同名方法是一樣的

parkAndCheckInterrupt()

其功能實現和獨佔模式下同名方法是一樣的

cancelAcquire(Node)

其功能實現和獨佔模式下同名方法是一樣的

 下圖則是共享模式下一個節點能走的最長流程,並非所有節點都能夠走完這個流程(比如說不是所有加入的節點會被設置為取消狀態,調用cancelAcquire(Node)),注意流程圖的箭頭指向主要是指時間順序,一個方法位於另一個方法中則說明其被另一個方法調用了。

 共享模式下釋放鎖的方法

方法名

作用/地位

releaseShared(int arg)

此方法是共享模式下線程釋放共享資源的頂層入口。

doReleaseShared()

根據當前隊列頭節點的狀態,在喚醒下一個節點之後(調用了unparkSuccessor(h);方法)對頭節點狀態做出相關改變。

unparkSuccessor(h);

此方法和獨佔模式下同名方法相同,目的都是喚醒下一個節點。

4.3 AQS設計模式分析

4.3.1 從AQS繼承角度看設計模式

 你或許會這麼想當然地認為:AQS類中即寫了acquire方法,有些了acquireShared方法,它們用於線程獲取資源,同時也規定了當前線程/節點獲取資源的方法是獨佔模式還是共享模式。那麼我現在提出一個問題,如果你能回答正確說明你已將AQS類學得不錯了:如果在同一個AQS對象中有一個節點元素被設置為獨佔模式,其餘節點的模式又都被設計共享模式,那麼獨佔模式下的那單個節點究竟何時會搶佔到資源,或者換個問題,獨佔模式下節點得到了資源,那麼共享模式下的節點還能搶佔到資源嗎?

 如果你有上述問題,那麼你一定是對AQS設計模式產生一定思考了。事實上,Java的開發者早就知道上述問題不好回答,也不必回答,因為上面的問題在AQS設計模式下根本就不存在,即一個AQS對象中不會出現兩個模式的系節點。那AQS類是通過何種設計方式實現這類性質的呢?接下來下面我們來看看關於AQS類的設計模式:

  1. AQS是抽象類不能夠用來創建對象,所以其能夠提供獨佔以及共享兩種模式的獲得資源的方法:acquire以及acquireShared。但是不能通過抽象化類來創建對象,所以不能調這兩個方法,不會出現兩個模式下的節點;
  2. AQS類中將tryAcquire以及TryAcquireShared設計為不是抽象的方法,這樣一來子類只需重寫其中一種方法即可。若方法都為抽象,那麼就會導致子類必須重寫AQS的這兩個方法,缺一不可。
  3. 獨佔模式/共享模式下的AQS子類的方法只會調用AQS對應模式下的方法其提供配套的try-語句重寫實現,比如:ReentrantLock類為獨佔模式,其只調用AbstractQueuedSynchronizer了的acquire方法,重寫了與之配套使用的tryAcquire()方法。
  4. AQS類的子類很多作為其他類的靜態內部類比如:java.util.concurrent.locks.ReentrantLock.Sync,但是java.util.concurrent.locks.ReentrantLock本身並沒有繼承AQS,這樣一來就保持了外部線程不能直接訪問acquire方法(因為內部類並非public修飾),這樣就保證了封裝上的安全性。比如我們就不能合法地調用reentantLock.Sync

4.3.2 從AQS類得中斷設計調度看設計模式

 我們之前常說AQS類的中斷是」不響應線程中斷」,那麼其究竟是如何實現的呢?現在讓我們以獨佔模式為例來探究一番。

 之前我們有說到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法是acquire中的關鍵實現,其功能是:用於在節點/線程沒有搶佔到鎖資源時,拉入隊列,並確保它能夠在將來某個時刻能夠搶佔到資源。而AQS中斷模式就是通過:acquireQueued方法中所調用的parkAndCheckInterrupt方法以及在acquire方法中調用的selfInterrupt來實現的,下面開始詳細介紹兩者之間的聯繫。

 首先,再次貼出parkAndCheckInterrupt方法的jdk源代碼:

private final boolean parkAndCheckInterrupt() {      LockSupport.park(this);//阻塞當前線程      return Thread.interrupted();//返回線程中斷狀態並複位中斷狀態  }

 其作用為:阻塞當前線程,然後在阻塞結束後,返回線程的中斷狀態並複位中斷狀態。

 其次,再次貼出acquire方法的源代碼:

    public final void acquire(int arg) {          if (!tryAcquire(arg) &&              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))              selfInterrupt();      }

 我們可以看到如果線程之前出現過中斷,acquireQueued方法返回true,那麼接下來才有可能進一步執行selfInterrupt方法。所以我們下面再貼出selfInterrupt方法的源代碼:

  static void selfInterrupt() {          Thread.currentThread().interrupt();      }

 所以我們可以最終得到了以下代碼的完整執行流程:

線程被中斷-> 線程掛起阻塞 -> 線程被喚醒,返回中斷值標誌true,並恢複線程中斷標誌位 -> 主動使線程再次中斷

 第一眼看這樣的中斷設計模式,顯然會感覺到很奇怪,因為為何先中斷處理再恢復之後再進行中斷。這使要從park方法以及wait方法的區別說起。如果你學習過Java-並發編程的話一定會對於wait--notify/notifyAll方法會有所學習,我們可以看到使用park--unpark休眠/喚醒和前者所實現的功能是差不多的:都是阻塞當前線程繼續向前運行,然後等待其它線程來對休眠線程進行喚醒,但是它們是有區別的,具體區別如下:

  1. wait--notify/notifyAll方法是繼承於Object類的方法對,所有對象都能調用此方法,而park--unpark這一方法對是Unsafe類中的本地方法;
  2. 如果線程在中斷狀態下調用wait方法,那麼線程會拋出相關異常,但是調用park方法不會拋出異常。但是要注意,它們在線程中斷狀態下使用都會導致阻塞作用失效,就wait方法來說,其在方法拋出異常之後還是會繼續執行接下來的代碼;而park方法則是不拋出異常的情況下繼續執行接下來的代碼;
  3. 如果在阻塞狀態下調用中斷方法,wait致使的阻塞狀態會導致線程的拋出異常,park方法不會使當前線程阻塞也不會導致線程的拋出異常,而是繼續執行接下來的代碼;
  4. park-unpark方法對的靈活性更高。如果先調用unpark方法,那麼下一次park方法將會不起到作用,而wait方法則沒有此性質。(此特性保證了AQS在線程調度方面的效率,在獨佔模式中,如果前一個節點剛好執行完畢,調用了喚醒下一個節點線程的方法unpark,那麼此時當前線程節點就不會進入阻塞狀態,而是會快速地去選擇搶佔資源)

線程狀態

中斷

非中斷

調用park方法

當前線程沒有被阻塞,無須喚醒就直接執行return語句,使當前線程中斷標誌位恢復為false的同時使parkAndCheckInterrupt方法返回true;最終導致acquireQueued中的中斷標誌位interrupted = true;且使acquireQueued方法也返回true;在調用park方法方法後,線程仍會進入自旋,如果沒有再次中斷,下次park方法會成功阻塞線程。

當前線程被阻塞,

在調用park後調用unpark方法

由於上一個park方法並未成功將線程阻塞,所以這裡調用相當於額外給了一個"permit",這樣一來相當於下次park方法調用時也不會阻塞線程

調用unpark方法喚醒線程後,parkAndCheckInterrupt方法返回false,標誌着線程沒有中斷;

 所以,知道了以上兩點,那麼就可以知道為什麼要進行中斷狀態的複位了,實際上線程在中斷狀態下的方法執行流程是這樣的:

線程中斷 -> 調用park方法 -> 線程未能夠成功阻塞 ->當前線程的中斷標誌位置為false ->acquireQueued方法內部中斷標誌位置為true -> 再次進入自旋for(;;) -> 若仍未搶佔到資源 … -> 再次調用park方法 -> 線程成功阻塞

注意事項:如果線程中斷過,哪怕只有一次,那麼方法acquireQueued方法總是會返回true

使用park方法相較於wait方法的優勢:

  1. 提升AQS模式下線程調度的效率(先unpark,則park不起作用)
  2. 可以在不拋出異常的情況下合理延遲處理中斷
  3. 使線程在不佔據鎖的情況下進入阻塞模式。

 現在我們可以說延遲處理中斷模式的核心就是park方法對於中斷狀態的響應方式是「跳過」不做任何額外處理。所謂「延遲」就是,通過將線程自帶的中斷標誌位置為false,而將acquireQueued方法內的中斷標誌位置為true,最終通過selfInterrupt()在線程搶佔到資源以後進行中斷處理。

 而讓如果想要利用到輕量的不佔據鎖資源的線程阻塞方法:park,成功實現在線程中斷或非中斷狀態下的阻塞工作,我們自然就需要使用Thread.interrupted();,因為中斷最重要的標誌位我們可以通過acquireQueued#interrupted標誌位來表示,但是如果不恢復當前線程的中斷標誌位,那麼當前線程將一直處於自旋狀態中,將一直搶佔CPU資源,導致一種情況的出現:獨佔模式下,一個線程在執行,另外的線程在「陪跑」。

五、等待隊列

 AQS 維護的隊列是當前等待資源的隊列,其又可以分為同步隊列以及條件隊列。兩種不同隊列中的節點結構實際上是相同的,都使用了AQS中的Node內部靜態final對象。當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成為一個節點並將其加入同步隊列。下面則給出同步隊列和條件隊列兩者間不同的內部數據結構實現:

而往往同步隊列和等待隊列實際上是一起協同工作的,它們協同工作的圖如下:

5.1 Condition 接口

 同步隊列和等待隊列的作用是不同的。每個線程只能存在於同步隊列或等待隊列中的一個。

 任意一個 Java 對象,都擁有一組監視器方法定義在( java.lang.Object 上),主要包括 wait()、notify()、notifyAll() 方法,這些方法與 synchronized 同步關鍵字配合,可以實現等待/通知模式。

Condition接口提供了類似Object的監視器方法,與lock配合可以實現等待/通知模式。

以下是Condition接口的所有抽象方法:

public interface Condition {      void await() throws InterruptedException;      void awaitUninterruptibly();      long awaitNanos(long nanosTimeout) throws InterruptedException;      boolean await(long time, TimeUnit unit) throws InterruptedException;      boolean awaitUntil(Date deadline) throws InterruptedException;      void signal();      void signalAll();  }

方法

說明

await()

調用此方法後,會使當前線程在接收到喚醒信號(signal)之前或被中斷之前一直處於等待休眠狀態。調用此方法後,當前線程會釋放持有的鎖。如果當前等待線程從該方法返回(被喚醒),那麼在返回之前會重新獲取鎖(獲取到鎖才能繼續執行)。

await(long time,TimeUnit unit)

調用此方法後,會使當前線程在接收到喚醒信號之前、被中斷之前或到達指定等待時間之前一直處於等待狀態。如果在從此方法返回前檢測到等待時間超時,則返回 false,否則返回 true。

awaitNanos(long nanosTimeout)

該方法等效於 await(long time,TimeUnit unit) 方法,只是等待的時間是 nanosTimeout 指定的以毫微秒數為單位的等待時間。該方法返回值是所剩毫微秒數的一個估計值,如果超時,則返回一個小於等於 0 的值。可以根據該返回值來確定是否要再次等待,以及再次等待的時間。

awaitUninterruptibly()

當前線程進入等待狀態直到被通知,該方法對中斷忽略。

awaitUntil(Date deadline)

當前線程進入等待狀態直到被通知,中斷或者到某個時間,如果沒有到指定時間就被通知,返回 true,否則表示到了指定時間,返回 false。

signal()

喚醒一個等待線程,如果所有的線程都在等待此條件,則選擇其中的一個喚醒。在從 await 返回之前,該線程必須重新獲取鎖。

signalAll()

喚醒所有等待線程,如果所有的線程都在等待此條件,則喚醒所有線程。 在從 await 返回之前,每個線程必須重新獲取鎖。

說明:除了awaitUninterruptibly方法,上述wait方法調用時,如果當前線程處於中斷狀態,那麼調用此方法就會拋出異常。

5.2 Condition 的實現

 在AQS類中Condition的實現類是內部公共類ConditionObject

ConditionObject是 AQS 的內部類,Condition的操作需要獲取相關聯的鎖,需要和同步器掛鈎。每個 Condition對象都包含着一個隊列(等待隊列),Condition中也有結點的概念,在將線程放到等待隊列中時會構造結點。

 等待隊列也是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在 Condition對象上等待的線程,如果一個線程調用了 await方法,那麼該線程將會釋放鎖,構造成結點加入等待隊列並進入等待狀態。  一個 Condition 包含一個等待隊列,Condition 擁有首結點(firstWaiter)和尾結點(lastWaiter)。 以下使用生產者和消費者模式用例進行說明同步隊列和等待隊列之間的區別與協同工作。

 案例需求:在一個有大小的隊列queue中,生產者往隊列中放數據,消費者從隊列中取數據,當隊列不滿時,生產者可以繼續生產數據,當隊列不空時,消費者可以繼續取數據,如果不符合條件,則等待,直到符合條件為止。

具體內容不妨參見個人筆記:鎖-Lock 章節的第七章