我工作三年了,該懂並發了!
本文的組織形式如下,主要會介紹到同步容器類,作業系統的並發工具,Java 開發工具包(只是簡單介紹一下,後面會有源碼分析)。同步工具類有哪些。
下面我們就來介紹一下 Java 並發中都涉及哪些模組,這些並發模組都是 Java 並發類庫所提供的。
同步容器類
同步容器主要包括兩類,一種是本來就是執行緒安全實現的容器,這類容器有 Vector、Hashtable、Stack,這類容器的方法上都加了 synchronized
鎖,是執行緒安全的實現。
Vector、Hashtable、Stack 這些容器我們現在幾乎都不在使用,因為這些容器在多執行緒環境下的效率不高。
還有一類是由 Collections.synchronizedxxx
實現的非執行緒安全的容器,使用 Collections.synchronized 會把它們封裝起來編程執行緒安全的容器,舉出兩個例子
- Collections.synchronizedList
- Collections.synchronizedMap
我們可以通過 Collections 源碼可以看出這些執行緒安全的實現
要不為啥要稱 Collections 為集合工具類呢?Collections 會把這些容器類的狀態封裝起來,並對每個同步方法進行同步,使得每次只有一個執行緒能夠訪問容器的狀態。
其中每個 synchronized xxx
都是相當於創建了一個靜態內部類。
雖然同步容器類都是執行緒安全的,但是在某些情況下需要額外的客戶端加鎖來保證一些複合操作的安全性,複合操作就是有兩個及以上的方法組成的操作,比如最典型的就是 若沒有則添加
,用偽程式碼表示則是
if(a == null){
a = get();
}
比如可以用來判斷 Map 中是否有某個 key,如果沒有則添加進 Map 中。這些複合操作在沒有客戶端加鎖的情況下是執行緒安全的,但是當多個執行緒並發修改容器時,可能會表現出意料之外的行為。例如下面這段程式碼
public class TestVector implements Runnable{
static Vector vector = new Vector();
static void addVector(){
for(int i = 0;i < 10000;i++){
vector.add(i);
}
}
static Object getVector(){
int index = vector.size() - 1;
return vector.get(index);
}
static void removeVector(){
int index = vector.size() - 1;
vector.remove(index);
}
@Override
public void run() {
getVector();
}
public static void main(String[] args) {
TestVector testVector = new TestVector();
testVector.addVector();
Thread t1 = new Thread(() -> {
for(int i = 0;i < vector.size();i++){
getVector();
}
});
Thread t2 = new Thread(() -> {
for(int i = 0;i < vector.size();i++){
removeVector();
}
});
t1.start();
t2.start();
}
}
這些方法看似沒有問題,因為 Vector 能夠保證執行緒安全性,無論多少個執行緒訪問 Vector 也不會造成 Vector 的內部產生破壞,但是從整個系統來說,是存在執行緒安全性的,事實上你運行一下,也會發現報錯。
會出現
如果執行緒 A 在包含這麼多元素的基礎上調用 getVector
方法,會得到一個數值,getVector 只是取得該元素,而並不是從 vector 中移除,removeVector
方法是得到一個元素進行移除,這段程式碼的不安全因素就是,因為執行緒的時間片是亂序的,而且 getVector 和 removeVector 並不會保證互斥,所以在 removeVector 方法把某個值比如 6666 移除後,vector 中就不存在這個 6666 的元素,此時 getVector 方法取得 6666 ,就會拋出數組越界異常。為什麼是數組越界異常呢?可以看一下 vector 的源碼
如果用圖表示的話,則會是下面這樣。
所以,從系統的層面來看,上面這段程式碼也要保證執行緒安全性才可以,也就是在客戶端加鎖
實現,只要我們讓複合操作使用一把鎖,那麼這些操作就和其他單獨的操作一樣都是原子性的。如下面例子所示
static Object getVector(){
synchronized (vector){
int index = vector.size() - 1;
return vector.get(index);
}
}
static void removeVector(){
synchronized (vector) {
int index = vector.size() - 1;
vector.remove(index);
}
}
也可以通過鎖住 .class
來保證原子性操作,也能達到同樣的效果。
static Object getVector(){
synchronized (TestVector.class){
int index = vector.size() - 1;
return vector.get(index);
}
}
static void removeVector(){
synchronized (TestVector.class) {
int index = vector.size() - 1;
vector.remove(index);
}
}
在調用 size 和 get 之間,Vector 的長度可能會發生變化,這種變化在對 Vector 進行排序時出現,如下所示
for(int i = 0;i< vector.size();i++){
doSomething(vector.get(i));
}
這種迭代的操作正確性取決於運氣,即在調用 size 和 get 之間會修改 Vector,在單執行緒環境中,這種假設完全成立,但是再有其他執行緒並發修改 Vector 時,則可能會導致麻煩。
我們仍舊可以通過客戶端加鎖的方式來避免這種情況
synchronized(vector){
for(int i = 0;i< vector.size();i++){
doSomething(vector.get(i));
}
}
這種方式為客戶端的可靠性提供了保證,但是犧牲了伸縮性,而且這種在遍歷過程中進行加鎖,也不是我們所希望看到的。
fail-fast
針對上面這種情況,很多集合類都提供了一種 fail-fast
機制,因為大部分集合內部都是使用 Iterator 進行遍歷,在循環中使用同步鎖的開銷會很大,而 Iterator 的創建是輕量級的,所以在集合內部如果有並發修改的操作,集合會進行快速失敗
,也就是 fail-fast
。當他們發現容器在迭代過程中被修改時,會拋出 ConcurrentModificationException
異常,這種快速失敗不是一種完備的處理機制,而只是 善意
的捕獲並發錯誤。
如果查看過 ConcurrentModificationException 的註解,你會發現,ConcurrentModificationException 拋出的原則由兩種,如下
造成這種異常的原因是由於多個執行緒在遍歷集合的同時對集合類內部進行了修改,這也就是 fail-fast 機制。
該註解還聲明了另外一種方式
這個問題也是很經典的一個問題,我們使用 ArrayList 來舉例子。如下程式碼所示
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 10 ; i++ ) {
list.add(i + "");
}
Iterator<String> iterator = list.iterator();
int i = 0 ;
while(iterator.hasNext()) {
if (i == 3) {
list.remove(3);
}
System.out.println(iterator.next());
i ++;
}
}
該段程式碼會發生異常,因為在 ArrayList 內部,有兩個屬性,一個是 modCount
,一個是 expectedModCount
,ArrayList 在 remove 等對集合結構的元素造成數量上的操作會有 checkForComodification
的判斷,如下所示,這也是這段程式碼的錯誤原因。
fail-safe
fail-safe
是 Java 中的一種 安全失敗
機制,它表示的是在遍歷時不是直接在原集合上進行訪問,而是先複製原有集合內容,在拷貝的集合上進行遍歷。 由於迭代時是對原集合的拷貝進行遍歷,所以在遍歷過程中對原集合所作的修改並不能被迭代器檢測到,所以不會觸發 ConcurrentModificationException。java.util.concurrent
包下的容器都是安全失敗的,可以在多執行緒條件下使用,並發修改。
比如 CopyOnWriteArrayList
, 它就是一種 fail-safe 機制的集合,它就不會出現異常,例如如下操作
List<Integer> integers = new CopyOnWriteArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
Iterator<Integer> itr = integers.iterator();
while (itr.hasNext()) {
Integer a = itr.next();
integers.remove(a);
}
CopyOnWriteArrayList 就是 ArrayList 的一種執行緒安全的變體,CopyOnWriteArrayList 中的所有可變操作比如 add 和 set 等等都是通過對數組進行全新複製來實現的。
作業系統中的並發工具
講到並發容器,就不得不提作業系統級別實現了哪些進程/執行緒間的並發容器,說白了其實就是數據結構的設計。下面我們就來一起看一下作業系統級別的並發工具
訊號量
訊號量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個整形變數來累計喚醒次數,以供之後使用。在他的觀點中,有一個新的變數類型稱作 訊號量(semaphore)
。一個訊號量的取值可以是 0 ,或任意正數。0 表示的是不需要任何喚醒,任意的正數表示的就是喚醒次數。
Dijkstra 提出了訊號量有兩個操作,現在通常使用 down
和 up
(分別可以用 sleep 和 wakeup 來表示)。down 這個指令的操作會檢查值是否大於 0 。如果大於 0 ,則將其值減 1 ;若該值為 0 ,則進程將睡眠,而且此時 down 操作將會繼續執行。檢查數值、修改變數值以及可能發生的睡眠操作均為一個單一的、不可分割的 原子操作(atomic action)
完成。
互斥量
如果不需要訊號量的計數能力時,可以使用訊號量的一個簡單版本,稱為 mutex(互斥量)
。互斥量的優勢就在於在一些共享資源和一段程式碼中保持互斥。由於互斥的實現既簡單又有效,這使得互斥量在實現用戶空間執行緒包時非常有用。
互斥量是一個處於兩種狀態之一的共享變數:解鎖(unlocked)
和 加鎖(locked)
。這樣,只需要一個二進位位來表示它,不過一般情況下,通常會用一個 整型(integer)
來表示。0 表示解鎖,其他所有的值表示加鎖,比 1 大的值表示加鎖的次數。
mutex 使用兩個過程,當一個執行緒(或者進程)需要訪問關鍵區域時,會調用 mutex_lock
進行加鎖。如果互斥鎖當前處於解鎖狀態(表示關鍵區域可用),則調用成功,並且調用執行緒可以自由進入關鍵區域。
另一方面,如果 mutex 互斥量已經鎖定的話,調用執行緒會阻塞直到關鍵區域內的執行緒執行完畢並且調用了 mutex_unlock
。如果多個執行緒在 mutex 互斥量上阻塞,將隨機選擇一個執行緒並允許它獲得鎖。
Futexes
隨著並行的增加,有效的同步(synchronization)
和鎖定(locking)
對於性能來說是非常重要的。如果進程等待時間很短,那麼自旋鎖(Spin lock)
是非常有效;但是如果等待時間比較長,那麼這會浪費 CPU 周期。如果進程很多,那麼阻塞此進程,並僅當鎖被釋放的時候讓內核解除阻塞是更有效的方式。不幸的是,這種方式也會導致另外的問題:它可以在進程競爭頻繁的時候運行良好,但是在競爭不是很激烈的情況下內核切換的消耗會非常大,而且更困難的是,預測鎖的競爭數量更不容易。
有一種有趣的解決方案是把兩者的優點結合起來,提出一種新的思想,稱為 futex
,或者是 快速用戶空間互斥(fast user space mutex)
,是不是聽起來很有意思?
futex 是 Linux
中的特性實現了基本的鎖定(很像是互斥鎖)而且避免了陷入內核中,因為內核的切換的開銷非常大,這樣做可以大大提高性能。futex 由兩部分組成:內核服務和用戶庫。內核服務提供了了一個 等待隊列(wait queue)
允許多個進程在鎖上排隊等待。除非內核明確的對他們解除阻塞,否則它們不會運行。
Pthreads 中的互斥量
Pthreads 提供了一些功能用來同步執行緒。最基本的機制是使用互斥量變數,可以鎖定和解鎖,用來保護每個關鍵區域。希望進入關鍵區域的執行緒首先要嘗試獲取 mutex。如果 mutex 沒有加鎖,執行緒能夠馬上進入並且互斥量能夠自動鎖定,從而阻止其他執行緒進入。如果 mutex 已經加鎖,調用執行緒會阻塞,直到 mutex 解鎖。如果多個執行緒在相同的互斥量上等待,當互斥量解鎖時,只有一個執行緒能夠進入並且重新加鎖。這些鎖並不是必須的,程式設計師需要正確使用它們。
下面是與互斥量有關的函數調用
和我們想像中的一樣,mutex 能夠被創建和銷毀,扮演這兩個角色的分別是 Phread_mutex_init
和 Pthread_mutex_destroy
。mutex 也可以通過 Pthread_mutex_lock
來進行加鎖,如果互斥量已經加鎖,則會阻塞調用者。還有一個調用Pthread_mutex_trylock
用來嘗試對執行緒加鎖,當 mutex 已經被加鎖時,會返回一個錯誤程式碼而不是阻塞調用者。這個調用允許執行緒有效的進行忙等。最後,Pthread_mutex_unlock
會對 mutex 解鎖並且釋放一個正在等待的執行緒。
除了互斥量以外,Pthreads
還提供了第二種同步機制: 條件變數(condition variables)
。mutex 可以很好的允許或阻止對關鍵區域的訪問。條件變數允許執行緒由於未滿足某些條件而阻塞。絕大多數情況下這兩種方法是一起使用的。下面我們進一步來研究執行緒、互斥量、條件變數之間的關聯。
下面再來重新認識一下生產者和消費者問題:一個執行緒將東西放在一個緩衝區內,由另一個執行緒將它們取出。如果生產者發現緩衝區沒有空槽可以使用了,生產者執行緒會阻塞起來直到有一個執行緒可以使用。生產者使用 mutex 來進行原子性檢查從而不受其他執行緒干擾。但是當發現緩衝區已經滿了以後,生產者需要一種方法來阻塞自己並在以後被喚醒。這便是條件變數做的工作。
下面是一些與條件變數有關的最重要的 pthread 調用
上表中給出了一些調用用來創建和銷毀條件變數。條件變數上的主要屬性是 Pthread_cond_wait
和 Pthread_cond_signal
。前者阻塞調用執行緒,直到其他執行緒發出訊號為止(使用後者調用)。阻塞的執行緒通常需要等待喚醒的訊號以此來釋放資源或者執行某些其他活動。只有這樣阻塞的執行緒才能繼續工作。條件變數允許等待與阻塞原子性的進程。Pthread_cond_broadcast
用來喚醒多個阻塞的、需要等待訊號喚醒的執行緒。
需要注意的是,條件變數(不像是訊號量)不會存在於記憶體中。如果將一個訊號量傳遞給一個沒有執行緒等待的條件變數,那麼這個訊號就會丟失,這個需要注意
管程
為了能夠編寫更加準確無誤的程式,Brinch Hansen 和 Hoare 提出了一個更高級的同步原語叫做 管程(monitor)
。管程有一個很重要的特性,即在任何時候管程中只能有一個活躍的進程,這一特性使管程能夠很方便的實現互斥操作。管程是程式語言的特性,所以編譯器知道它們的特殊性,因此可以採用與其他過程調用不同的方法來處理對管程的調用。通常情況下,當進程調用管程中的程式時,該程式的前幾條指令會檢查管程中是否有其他活躍的進程。如果有的話,調用進程將被掛起,直到另一個進程離開管程才將其喚醒。如果沒有活躍進程在使用管程,那麼該調用進程才可以進入。
進入管程中的互斥由編譯器負責,但是一種通用做法是使用 互斥量(mutex)
和 二進位訊號量(binary semaphore)
。由於編譯器而不是程式設計師在操作,因此出錯的幾率會大大降低。在任何時候,編寫管程的程式設計師都無需關心編譯器是如何處理的。他只需要知道將所有的臨界區轉換成為管程過程即可。絕不會有兩個進程同時執行臨界區中的程式碼。
即使管程提供了一種簡單的方式來實現互斥,但在我們看來,這還不夠。因為我們還需要一種在進程無法執行被阻塞。在生產者-消費者問題中,很容易將針對緩衝區滿和緩衝區空的測試放在管程程式中,但是生產者在發現緩衝區滿的時候該如何阻塞呢?
解決的辦法是引入條件變數(condition variables)
以及相關的兩個操作 wait
和 signal
。當一個管程程式發現它不能運行時(例如,生產者發現緩衝區已滿),它會在某個條件變數(如 full)上執行 wait
操作。這個操作造成調用進程阻塞,並且還將另一個以前等在管程之外的進程調入管程。在前面的 pthread 中我們已經探討過條件變數的實現細節了。另一個進程,比如消費者可以通過執行 signal
來喚醒阻塞的調用進程。
通過臨界區自動的互斥,管程比訊號量更容易保證並行編程的正確性。但是管程也有缺點,我們前面說到過管程是一個程式語言的概念,編譯器必須要識別管程並用某種方式對其互斥作出保證。C、Pascal 以及大多數其他程式語言都沒有管程,所以不能依靠編譯器來遵守互斥規則。
與管程和訊號量有關的另一個問題是,這些機制都是設計用來解決訪問共享記憶體的一個或多個 CPU 上的互斥問題的。通過將訊號量放在共享記憶體中並用 TSL
或 XCHG
指令來保護它們,可以避免競爭。但是如果是在分散式系統中,可能同時具有多個 CPU 的情況,並且每個 CPU 都有自己的私有記憶體呢,它們通過網路相連,那麼這些原語將會失效。因為訊號量太低級了,而管程在少數幾種程式語言之外無法使用,所以還需要其他方法。
消息傳遞
上面提到的其他方法就是 消息傳遞(messaage passing)
。這種進程間通訊的方法使用兩個原語 send
和 receive
,它們像訊號量而不像管程,是系統調用而不是語言級別。示例如下
send(destination, &message);
receive(source, &message);
send 方法用於向一個給定的目標發送一條消息,receive 從一個給定的源接收一條消息。如果沒有消息,接受者可能被阻塞,直到接收一條消息或者帶著錯誤碼返回。
消息傳遞系統現在面臨著許多訊號量和管程所未涉及的問題和設計難點,尤其對那些在網路中不同機器上的通訊狀況。例如,消息有可能被網路丟失。為了防止消息丟失,發送方和接收方可以達成一致:一旦接受到消息後,接收方馬上回送一條特殊的 確認(acknowledgement)
消息。如果發送方在一段時間間隔內未收到確認,則重發消息。
現在考慮消息本身被正確接收,而返回給發送著的確認消息丟失的情況。發送者將重發消息,這樣接受者將收到兩次相同的消息。
對於接收者來說,如何區分新的消息和一條重發的老消息是非常重要的。通常採用在每條原始消息中嵌入一個連續的序號來解決此問題。如果接受者收到一條消息,它具有與前面某一條消息一樣的序號,就知道這條消息是重複的,可以忽略。
消息系統還必須處理如何命名進程的問題,以便在發送或接收調用中清晰的指明進程。身份驗證(authentication)
也是一個問題,比如客戶端怎麼知道它是在與一個真正的文件伺服器通訊,從發送方到接收方的資訊有可能被中間人所篡改。
屏障
最後一個同步機制是準備用於進程組而不是進程間的生產者-消費者情況的。在某些應用中劃分了若干階段,並且規定,除非所有的進程都就緒準備著手下一個階段,否則任何進程都不能進入下一個階段,可以通過在每個階段的結尾安裝一個 屏障(barrier)
來實現這種行為。當一個進程到達屏障時,它會被屏障所攔截,直到所有的屏障都到達為止。屏障可用於一組進程同步,如下圖所示
在上圖中我們可以看到,有四個進程接近屏障,這意味著每個進程都在進行運算,但是還沒有到達每個階段的結尾。過了一段時間後,A、B、D 三個進程都到達了屏障,各自的進程被掛起,但此時還不能進入下一個階段呢,因為進程 B 還沒有執行完畢。結果,當最後一個 C 到達屏障後,這個進程組才能夠進入下一個階段。
避免鎖:讀-複製-更新
最快的鎖是根本沒有鎖。問題在於沒有鎖的情況下,我們是否允許對共享數據結構的並發讀寫進行訪問。答案當然是不可以。假設進程 A 正在對一個數字數組進行排序,而進程 B 正在計算其平均值,而此時你進行 A 的移動,會導致 B 會多次讀到重複值,而某些值根本沒有遇到過。
然而,在某些情況下,我們可以允許寫操作來更新數據結構,即便還有其他的進程正在使用。竅門在於確保每個讀操作要麼讀取舊的版本,要麼讀取新的版本,例如下面的樹
上面的樹中,讀操作從根部到葉子遍歷整個樹。加入一個新節點 X 後,為了實現這一操作,我們要讓這個節點在樹中可見之前使它”恰好正確”:我們對節點 X 中的所有值進行初始化,包括它的子節點指針。然後通過原子寫操作,使 X 稱為 A 的子節點。所有的讀操作都不會讀到前後不一致的版本
在上面的圖中,我們接著移除 B 和 D。首先,將 A 的左子節點指針指向 C 。所有原本在 A 中的讀操作將會後續讀到節點 C ,而永遠不會讀到 B 和 D。也就是說,它們將只會讀取到新版數據。同樣,所有當前在 B 和 D 中的讀操作將繼續按照原始的數據結構指針並且讀取舊版數據。所有操作均能正確運行,我們不需要鎖住任何東西。而不需要鎖住數據就能夠移除 B 和 D 的主要原因就是 讀-複製-更新(Ready-Copy-Update,RCU)
,將更新過程中的移除和再分配過程分離開。
Java 並發工具包
JDK 1.5 提供了許多種並發容器來改進同步容器的性能,同步容器將所有對容器狀態的訪問都串列化,以實現他們之間的執行緒安全性。這種方法的代價是嚴重降低了並發性能,當多個執行緒爭搶容器鎖的同時,嚴重降低吞吐量。
下面我們就來一起認識一下 Java 中都用了哪些並發工具
Java 並發工具綜述
在 Java 5.0 中新增加了 ConcurrentHashMap
用來替代基於散列的 Map 容器;新增加了 CopyOnWriteArrayList
和 CopyOnWriteArraySet
來分別替代 ArrayList 和 Set 介面實現類;還新增加了兩種容器類型,分別是 Queue
和 BlockingQueue
, Queue 是隊列的意思,它有一些實現分別是傳統的先進先出隊列 ConcurrentLinkedQueue
以及並發優先順序隊列 PriorityQueue
。Queue 是一個先入先出的隊列,它的操作不會阻塞,如果隊列為空那麼獲取元素的操作會返回空值。PriorityQueue 擴展了 Queue,增加了可阻塞的插入和獲取等操作。如果隊列為空,那麼獲取元素的操作將一直阻塞,直到隊列中出現一個可用的元素為止。如果隊列已滿,那麼插入操作則一直阻塞,直到隊列中有可用的空間為止。
Java 6.0 還引入了 ConcurrentSkipListMap
和 ConcurrentSkipListSet
分別作為同步的 SortedMap 和 SortedSet 的並發替代品。下面我們就展開探討了,設計不到底層源碼,因為本篇文章主要目的就是為了描述一下有哪些東西以及用了哪些東西。
ConcurrentHashMap
我們先來看一下 ConcurrentHashMap 在並發集合中的位置
可以看到,ConcurrentHashMap 繼承了 AbstractMap
介面並實現了 ConcurrentMap 和 Serializable 介面,AbstractMap 和 ConcurrentMap 都是 Map 的實現類,只不過 AbstractMap 是抽象實現。
ConcurrentHashMap 和 Hashtable 的構造非常相似,只不過 Hashtable 容器在激烈競爭的場景中會表現出效率低下的現象,這是因為所有訪問 Hashtable 的執行緒都想獲取同一把鎖,如果容器裡面有多把鎖,並且每一把鎖都只用來鎖定一段數據,那麼當多個執行緒訪問不同的數據段時,就不存在競爭關係。這就是 ConcurreentHashMap 採用的 分段鎖
實現。在這種鎖實現中,任意數量的讀取執行緒可以並發的訪問 Map,執行讀取操作的執行緒和執行寫入的執行緒可以並發的訪問 Map,並且在讀取的同時也可以並發修改 Map。
ConcurrentHashMap 分段鎖實現帶來的結果是,在並發環境下可以實現更高的吞吐量,在單執行緒環境下只損失非常小的性能。
你知道 HashMap 是具有 fail-fast 機制的,也就是說它是一種強一致性的集合,在數據不一致的情況下會拋出 ConcurrentModificationException
異常,而 ConcurrentHashMap 是一種 弱一致性
的集合,在並發修改其內部結構時,它不會拋出 ConcurrentModificationException 異常,弱一致性能夠容忍並發修改。
在 HashMap 中,我們一般使用的 size、empty、containsKey 等方法都是標準方法,其返回的結果是一定的,包含就是包含,不包含就是不包含,可以作為判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個 精確值
,像是 size、empty 這些方法在並發場景下用處很小,因為他們的返回值總是在不斷變化,所以這些操作的需求就被弱化了。
在 ConcurrentHashMap 中沒有實現對 Map 加鎖從而實現獨佔訪問。在執行緒安全的 Map 實現 Hashtable
和 Collections.synchronizedMap
中都實現了獨佔訪問,因此只能單個執行緒修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具有更多的優勢和更少的劣勢,只有當需要獨佔訪問的需求時才會使用 Hashtable 或者是 Collections.synchronizedMap ,否則其他並發場景下,應該使用 ConcurrentHashMap。
ConcurrentMap
ConcurrentMap 是一個介面,它繼承了 Map 介面並提供了 Map 介面中四個新的方法,這四個方法都是 原子性
方法,進一步擴展了 Map 的功能。
public interface ConcurrentMap<K, V> extends Map<K, V> {
// 僅當 key 沒有相應的映射值時才插入
V putIfAbsent(K key, V value);
// 僅當 key 被映射到 value 時才移除
boolean remove(Object key, Object value);
// 僅當 key 被映射到 value 時才移除
V replace(K key, V value);
// 僅當 key 被映射到 oldValue 時才替換為 newValue
boolean replace(K key, V oldValue, V newValue);
}
ConcurrentNavigableMap
java.util.concurrent.ConcurrentNavigableMap
類是 java.util.NavigableMap
的子類,它支援並發訪問,並且允許其視圖的並發訪問。
什麼是視圖呢?視圖就是集合中的一段數據序列,ConcurrentNavigableMap 中支援使用 headMap
、subMap
、tailMap
返回的視圖。與其重新解釋一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法
- headMap 方法:headMap 方法返回一個嚴格小於給定鍵的視圖
- tailMap 方法:tailMap 方法返回包含大於或等於給定鍵的視圖。
- subMap 方法:subMap 方法返回給定兩個參數的視圖
ConcurrentNavigableMap 介面包含一些可能有用的其他方法
- descendingKeySet()
- descendingMap()
- navigableKeySet()
更多關於方法的描述這裡就不再贅述了,讀者朋友們可自行查閱 javadoc
ConcurrentSkipListMap
ConcurrentSkipListMap
是執行緒安全的有序的哈希表,適用於高並發的場景。
ConcurrentSkipListMap 的底層數據結構是基於跳錶
實現的。ConcurrentSkipListMap 可以提供 Comparable 內部排序或者是 Comparator 外部排序,具體取決於使用哪個構造函數。
ConcurrentSkipListSet
ConcurrentSkipListSet
是執行緒安全的有序的集合,適用於高並發的場景。ConcurrentSkipListSet 底層是通過 ConcurrentNavigableMap 來實現的,它是一個有序的執行緒安全的集合。
ConcurrentSkipListSet有序的,基於元素的自然排序或者通過比較器確定的順序;
ConcurrentSkipListSet是執行緒安全的;
CopyOnWriteArrayList
CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,所有可變操作比如 add、set 其實都是重新創建了一個副本,通過對數組進行複製而實現的。
CopyOnWriteArrayList 其內部有一個指向數組的引用,數組是不會被修改的,每次並發修改 CopyOnWriteArrayList 都相當於重新創建副本,CopyOnWriteArrayList 是一種 fail-safe
機制的,它不會拋出 ConcurrentModificationException 異常,並且返回元素與迭代器創建時的元素相同。
每次並發寫操作都會創建新的副本,這個過程存在一定的開銷,所以,一般在規模很大時,讀操作要遠遠多於寫操作時,為了保證執行緒安全性,會使用 CopyOnWriteArrayList。
類似的,CopyOnWriteArraySet 的作用也相當於替代了 Set 介面。
BlockingQueue
BlockingQueue 譯為 阻塞隊列
,它是 JDK 1.5 添加的新的工具類,它繼承於 Queue 隊列
,並擴展了 Queue 的功能。
BlockingQueue 在檢索元素時會等待隊列變成非空,並在存儲元素時會等待隊列變為可用。BlockingQueue 的方法有四種實現形式,以不同的方式來處理。
- 第一種是
拋出異常
特殊值
:第二種是根據情況會返回 null 或者 false阻塞
:第三種是無限期的阻塞當前執行緒直到操作變為可用後超時
:第四種是給定一個最大的超時時間,超過後才會放棄
BlockingQueue 不允許添加 null 元素,在其實現類的方法 add、put 或者 offer 後時添加 null 會拋出空指針異常。BlockingQueue 會有容量限制。在任意時間內,它都會有一個 remainCapacity,超過該值之前,任意 put 元素都會阻塞。
BlockingQueue 一般用於實現生產者 - 消費者
隊列,如下圖所示
BlockingQueue 有多種實現,下面我們一起來認識一下這些容器。
其中 LinkedBlockingQueue
和 ArrayBlockingQueue
是 FIFO 先入先出隊列,二者分別和 LinkedList
和 ArrayList
對應,比同步 List 具有更好的並發性能。 PriorityBlockingQueue
是一個優先順序排序的阻塞隊列,如果你希望按照某種順序而不是 FIFO 處理元素時這個隊列將非常有用。正如其他有序的容器一樣,PriorityBlockingQueue 既可以按照自然順序來比較元素,也可以使用 Comparator
比較器進行外部元素比較。SynchronousQueue
它維護的是一組執行緒而不是一組隊列,實際上它不是一個隊列,它的每個 insert 操作必須等待其他相關執行緒的 remove 方法後才能執行,反之亦然。
LinkedBlockingQueue
LinkedBlockingQueue
是一種 BlockingQueue 的實現。
它是一種基於鏈表的構造、先入先出的有界阻塞隊列。隊列的 head
也就是頭元素是在隊列中等待時間最長的元素;隊列的 tail
也就是隊尾元素是隊列中等待時間最短的元素。新的元素會被插入到隊尾中,檢索操作將獲取隊列中的頭部元素。鏈表隊列通常比基於數組的隊列具有更高的吞吐量,但是在大多數並發應用程式中,可預測的性能較差。
ArrayBlockingQueue
ArrayBlockingQueue
是一個用數組實現的有界隊列,此隊列順序按照先入先出
的原則對元素進行排序。
默認情況下不保證執行緒公平的訪問隊列,所謂公平訪問隊列指的是阻塞的執行緒,可以按照阻塞的先後順序訪問,即先阻塞執行緒先訪問隊列。非公平性是對先等待的執行緒是非公平的。有可能先阻塞的執行緒最後才訪問隊列。
PriorityBlockingQueue
PriorityBlockingQueue
是一個支援優先順序的阻塞隊列,默認情況下的元素採取自然順序生序或者降序,也可以自己定義 Comparator 進行外部排序。但需要注意的是不能保證同優先順序元素的順序。
DelayQueue
DelayQueue
是一個支援延時獲取元素的無阻塞隊列,其中的元素只能在延遲到期後才能使用,DelayQueue 中的隊列頭是延遲最長時間的元素,如果沒有延遲,則沒有 head 頭元素,poll 方法會返回 null。判斷的依據就是 getDelay(TimeUnit.NANOSECONDS)
方法返回一個值小於或者等於 0 就會發生過期。
TransferQueue
TransferQueue 繼承於 BlockingQueue,它是一個介面,一個 BlockingQueue 是一個生產者可能等待消費者接受元素,TransferQueue 則更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費,新添加的transfer 方法用來實現這種約束。
TransferQueue 有下面這些方法:兩個 tryTransfer
方法,一個是非阻塞的,另一個是帶有 timeout 參數設置超時時間的。還有兩個輔助方法 hasWaitingConsumer
和 getWaitingConcusmerCount
。
LinkedTransferQueue
一個無界的基於鏈表的 TransferQueue。這個隊列對任何給定的生產者進行 FIFO 排序,head 是隊列中存在時間最長的元素。tail 是隊列中存在時間最短的元素。
BlockingDeque
與 BlockingQueue 相對的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對 Queue 和 BlockingQueue 做了擴展。
Deque
是一個雙端隊列,分別實現了在隊列頭和隊列尾的插入。Deque 的實現有 ArrayDeque
、ConcurrentLinkedDeque
,BlockingDeque 的實現有 LinkedBlockingDeque
。
阻塞模式一般用於生產者 – 消費者隊列,而雙端隊列適用於工作密取。在工作密取的設計中,每個消費者都有各自的雙端隊列,如果一個消費者完成了自己雙端隊列的任務,就會去其他雙端隊列的末尾進行消費。密取方式要比傳統的生產者 – 消費者隊列具有更高的可伸縮性,這是因為每個工作密取的工作者都有自己的雙端隊列,不存在競爭的情況。
ArrayDeque
ArrayDeque 是 Deque 的可動態調整大小的數組實現,其內部沒有容量限制,他們會根據需要進行增長。ArrayDeque 不是執行緒安全的,如果沒有外部加鎖的情況下,不支援多執行緒訪問。ArrayDeque 禁止空元素,這個類作為棧使用時要比 Stack 快,作為 queue 使用時要比 LinkedList 快。
除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恆定的開銷運行。
注意:ArrayDeque 是 fail-fast 的,如果創建了迭代器之後,卻使用了迭代器外部的 remove 等修改方法,那麼這個類將會拋出 ConcurrentModificationException 異常。
ConcurrentLinkedDeque
ConcurrentLinkedDeque
是 JDK1.7 引入的雙向鏈表的無界並發隊列。它與 ConcurrentLinkedQueue 的區別是 ConcurrentLinkedDeque 同時支援 FIFO 和 FILO 兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除)。ConcurrentLinkedDeque 也支援 happen-before
原則。ConcurrentLinkedDeque 不允許空元素。
LinkedBlockingDeque
LinkedBlockingDeque
是一個由鏈表結構組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素。雙向隊列因為多了一個操作隊列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。LinkedBlockingDeque 把初始容量和構造函數綁定,這樣能夠有效過度拓展。初始容量如果沒有指定,就取的是 Integer.MAX_VALUE
,這也是 LinkedBlockingDeque 的默認構造函數。
同步工具類
同步工具類可以是任何一個對象,只要它根據自身狀態來協調執行緒的控制流。阻塞隊列可以作為同步控制類,其他類型的同步工具類還包括 訊號量(Semaphore)
、柵欄(Barrier)
和 閉鎖(Latch)
。下面我們就來一起認識一下這些工具類
Semaphore
Semaphore 翻譯過來就是 訊號量
,訊號量是什麼?它其實就是一種訊號,在作業系統中,也有訊號量的這個概念,在進程間通訊的時候,我們就會談到訊號量進行通訊。還有在 Linux 作業系統採取中斷時,也會向進程發出中斷訊號,根據進程的種類和訊號的類型判斷是否應該結束進程。
在 Java 中,Semaphore(訊號量)
是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。
Semaphore 管理著一組許可(permit)
,許可的初始數量由構造函數來指定。在獲取某個資源之前,應該先從訊號量獲取許可(permit)
,以確保資源是否可用。當執行緒完成對資源的操作後,會把它放在池中並向訊號量返回一個許可,從而允許其他執行緒訪問資源,這叫做釋放許可。如果沒有許可的話,那麼 acquire
將會阻塞直到有許可(中斷或者操作超時)為止。release
方法將返回一個許可訊號量。
Semaphore 可以用來實現流量控制,例如常用的資料庫連接池,執行緒請求資源時,如果資料庫連接池為空則阻塞執行緒,直接返回失敗,如果連接池不為空時解除阻塞。
CountDownLatch
閉鎖(Latch)
是一種同步工具類,它可以延遲執行緒的進度以直到其到達終止狀態。閉鎖的作用相當於是一扇門,在閉鎖達到結束狀態前,門是一直關著的,沒有任何執行緒能夠通過。當閉鎖到達結束狀態後,這扇門會打開並且允許任何執行緒通過,然後就一直保持打開狀態。
CountDownLatch
就是閉鎖的一種實現。它可以使一個或者多個執行緒等待一組事件的發生。閉鎖有一個計數器,閉鎖需要對計數器進行初始化,表示需要等待的次數,閉鎖在調用 await
處進行等待,其他執行緒在調用 countDown 把閉鎖 count 次數進行遞減,直到遞減為 0 ,喚醒 await。如下程式碼所示
public class TCountDownLatch {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
Increment increment = new Increment(latch);
Decrement decrement = new Decrement(latch);
new Thread(increment).start();
new Thread(decrement).start();
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Decrement implements Runnable {
CountDownLatch countDownLatch;
public Decrement(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for(long i = countDownLatch.getCount();i > 0;i--){
Thread.sleep(1000);
System.out.println("countdown");
this.countDownLatch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Increment implements Runnable {
CountDownLatch countDownLatch;
public Increment(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("await");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
Future
我們常見的創建多執行緒的方式有兩種,一種是繼承 Thread 類,一種是實現 Runnable 介面。這兩種方式都沒有返回值。相對的,創建多執行緒還有其他三種方式,那就是使用 Callable
介面、 Future
介面和 FutureTask
類。Callable 我們之前聊過,這裡就不再描述了,我們主要來描述一下 Future 和 FutureTask 介面。
Future 就是對具體的 Runnable 或者 Callable 任務的執行結果進行一系列的操作,必要時可通過 get
方法獲取執行結果,這個方法會阻塞直到執行結束。Future 中的主要方法有
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
cancel(boolean mayInterruptIfRunning)
: 嘗試取消任務的執行。如果任務已經完成、已經被取消或者由於某些原因而無法取消,那麼這個嘗試會失敗。如果取消成功,或者在調用 cancel 時此任務尚未開始,那麼此任務永遠不會執行。如果任務已經開始,那麼 mayInterruptIfRunning 參數會確定是否中斷執行任務以便於嘗試停止該任務。這個方法返回後,會對isDone
的後續調用也返回 true,如果 cancel 返回 true,那麼後續的調用isCancelled
也會返回 true。boolean isCancelled()
:如果此任務在正常完成之前被取消,則返回 true。boolean isDone()
:如果任務完成,返回 true。V get() throws InterruptedException, ExecutionException
:等待必要的計算完成,然後檢索其結果V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
: 必要時最多等待給定時間以完成計算,然後檢索其結果。
因為Future只是一個介面,所以是無法直接用來創建對象使用的,因此就有了下面的FutureTask。
FutureTask
FutureTask 實現了 RunnableFuture
介面,RunnableFuture 介面是什麼呢?
RunnableFuture 介面又繼承了 Runnable
介面和 Future
介面。納尼?在 Java 中不是只允許單繼承么,是的,單繼承更多的是說的類與類之間的繼承關係,子類繼承父類,擴展父類的介面,這個過程是單向的,就是為了解決多繼承引起的過渡引用問題。而介面之間的繼承是介面的擴展,在 Java 編程思想中也印證了這一點
對 RunnableFuture 介面的解釋是:成功執行的 run 方法會使 Future 介面的完成並允許訪問其結果。所以它既可以作為 Runnable 被執行緒執行,又可以作為 Future 得到 Callable 的返回值。
FutureTask 也可以用作閉鎖,它可以處於以下三種狀態
- 等待運行
- 正在運行
- 運行完成
FutureTask 在 Executor
框架中表示非同步任務,此外還可以表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動。
FutureTask 具體的源碼我後面會單獨出文章進行描述。
Barrier
我們上面聊到了通過閉鎖來啟動一組相關的操作,使用閉鎖來等待一組事件的執行。閉鎖是一種一次性對象,一旦進入終止狀態後,就不能被 重置
。
Barrier
的特點和閉鎖也很類似,它也是阻塞一組執行緒直到某個事件發生。柵欄與閉鎖的區別在於,所有執行緒必須同時到達柵欄的位置,才能繼續執行,就像我們上面作業系統給出的這幅圖一樣。
ABCD 四條執行緒,必須同時到達 Barrier,然後 手牽手
一起走過幸福的殿堂。
當執行緒到達 Barrier 的位置時會調用 await
方法,這個方法會阻塞直到所有執行緒都到達 Barrier 的位置,如果所有執行緒都到達 Barrier 的位置,那麼 Barrier 將會打開使所有執行緒都被釋放,而 Barrier 將被重置以等待下次使用。如果調用 await 方法導致超時,或者 await 阻塞的執行緒被中斷,那麼 Barrier 就被認為被打破,所有阻塞的 await 都會拋出 BrokenBarrierException
。如果成功通過柵欄後,await 方法返回一個唯一索引號,可以利用這些索引號選舉一個新的 leader,來處理一下其他工作。
public class TCyclicBarrier {
public static void main(String[] args) {
Runnable runnable = () -> System.out.println("Barrier 1 開始...");
Runnable runnable2 = () -> System.out.println("Barrier 2 開始...");
CyclicBarrier barrier1 = new CyclicBarrier(2,runnable);
CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2);
CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2);
CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2);
new Thread(b1).start();
new Thread(b2).start();
}
}
class CyclicBarrierRunnable implements Runnable {
CyclicBarrier barrier1;
CyclicBarrier barrier2;
public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){
this.barrier1 = barrier1;
this.barrier2 = barrier2;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "等待 barrier1" );
barrier1.await();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "等待 barrier2" );
barrier2.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +
" 做完了!");
}
}
Exchanger
與 Barrier 相關聯的還有一個工具類就是 Exchanger
, Exchanger 是一個用於執行緒間協作的工具類。Exchanger用於進行執行緒間的數據交換。
它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的數據。這兩個執行緒通過exchange 方法交換數據, 如果第一個執行緒先執行 exchange方法,它會一直等待第二個執行緒也執行 exchange,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換數據,將本執行緒生產出來的數據傳遞給對方。因此使用Exchanger 的重點是成對的執行緒使用 exchange() 方法,當有一對執行緒達到了同步點,就會進行交換數據。因此該工具類的執行緒對象是成對的。
下面通過一段例子程式碼來講解一下
public class TExchanger {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A");
ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B");
new Thread(exchangerRunnable).start();
new Thread(exchangerRunnable2).start();
}
}
class ExchangerRunnable implements Runnable {
Exchanger exchanger;
Object object;
public ExchangerRunnable(Exchanger exchanger,Object object){
this.exchanger = exchanger;
this.object = object;
}
@Override
public void run() {
Object previous = object;
try {
object = this.exchanger.exchange(object);
System.out.println(
Thread.currentThread().getName() + "改變前是" + previous + "改變後是" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
總結
本篇文章我們從同步容器類入手,主要講了 fail-fast
和 fail-safe
機制,這兩個機制在並發編程中非常重要。然後我們從作業系統的角度,聊了聊作業系統層面實現安全性的幾種方式,然後從作業系統 -> 並發我們聊了聊 Java 中的並發工具包有哪些,以及構建並發的幾種工具類。
你好,我是 cxuan,我自己手寫了四本 PDF,分別是 Java基礎總結、HTTP 核心總結、電腦基礎知識,作業系統核心總結,我已經整理成為 PDF,可以關注公眾號 Java建設者 回復 PDF 領取優質資料。