多線程高並發編程(6) — Semaphere、Exchanger源碼分析
一.Semaphere
1.概念
一個計數信號量。在概念上,信號量維持一組許可證。如果有必要,每個acquire()
都會阻塞,直到許可證可用,然後才能使用它。每個release()
添加許可證,潛在地釋放阻塞獲取方。但是,沒有使用實際的許可證對象;Semaphore
只保留可用數量的計數,並相應地執行。即一個Semaphore維護了一組permits【許可證】。每次調用acquire()方法都會阻塞,直到獲取到許可證。每次調用release()方法都會添加一個許可證,也就是釋放一個被阻塞的獲取者。但是實際上並不存在這個許可證,Semaphore僅僅是記錄可用資源的數量,並且做出對應的行為(有資源就獲取,沒有資源就阻塞)。
信號量通常用於限制線程數,而不是訪問某些(物理或邏輯)資源。
-
線程池控制的是線程數量,而信號量控制的是並發數量,雖然說看起來一樣,但兩者還是有區別的。
-
信號量類似於鎖機制,信號量的調用,當達到數量後,線程還是存在的,只是被掛起了而已。而線程池,同時執行的線程數量是固定的,超過了數量的只能等待。
在獲得項目之前,每個線程必須從信號量獲取許可證,以確保某個項目可用。當線程完成該項目後,它將返回到池中,並將許可證返回到信號量,允許另一個線程獲取該項目。請注意,當調用acquire()時,不會保持同步鎖定,因為這將阻止某個項目返回到池中。信號量封裝了限制對池的訪問所需的同步,與保持池本身一致性所需的任何同步分開。【即將限制對池的訪問和對池中數據的操作所需要的鎖分開】。
信號量被初始化為一個,並且被使用,使得它只有至多一個允許可用,可以用作互斥鎖。這通常被稱為二進制信號量,因為它只有兩個狀態:一個許可證可用,或零個許可證可用。當以這種方式使用時,二進制信號量具有屬性(與許多Lock實現不同),「鎖」可以由除所有者之外的線程釋放(因為信號量沒有所有權概念)。這在某些專門的上下文中是有用的,例如死鎖恢復。
Semaphore(int permits) 創建一個 Semaphore與給定數量的許可證和非公平公平設置。 Semaphore(int permits, boolean fair) 創建一個 Semaphore與給定數量的許可證和給定的公平設置。
此類的構造函數可選擇接受公平參數。當設置為false時,此類不會保證線程獲取許可的順序。特別是,闖入是允許的,也就是說,一個線程調用acquire()可以提前已經等待線程分配的許可證-在等待線程隊列的頭部邏輯新的線程將自己【新線程將自己放在等待線程隊列的最前面】。當公平設置為真時,信號量保證調用acquire方法的線程被選擇以按照它們調用這些方法的順序獲得許可(先進先出; FIFO)【FIFO的順序是指是依據到達方法內部的執行點的時間,並不是方法執行的時間。】。請注意,FIFO排序必須適用於這些方法中的特定內部執行點。因此,一個線程可以在另一個線程之前調用acquire,但是在另一個線程之後到達排序點,並且類似地從方法返回。另請注意,未定義的tryAcquire方法不符合公平性設置,但將採取任何可用的許可證。【不定時的tryAcquire()方法會任意選取可用的許可證。】【非公平鎖可以插隊獲取運行,公平鎖按照線程順序執行。】
通常,用於控制資源訪問的信號量應該被公平地初始化,以確保線程沒有被訪問資源【確保沒有線程因為長時間獲取不到許可證而餓死】。當使用信號量進行其他類型的同步控制時,非正常排序的吞吐量優勢往往超過公平性。
2.用法
線程可以通過acquire()方法獲取到一個許可,然後對共享資源進行操作,如果許可集已分配完了,那麼線程將進入等待狀態,直到其他線程釋放許可才有機會再獲取許可,線程釋放一個許可通過release()方法完成,”許可”將被歸還給Semaphore。
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); for (int i = 0; i < 7; i++) { Runnable runnable = () -> { try { sp.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "進入,當前已有" + (3 - sp.availablePermits()) + "個並發"); try { Thread.sleep((long) (Math.random() * 10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "即將離開"); sp.release(); //下面代碼有時候執行不準確,因為其沒有和上面的代碼合成原子單元 System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有" + (3 - sp.availablePermits()) + "個並發"); }; service.execute(runnable); } } 結果: 線程pool-1-thread-1進入,當前已有1個並發 線程pool-1-thread-2進入,當前已有2個並發 線程pool-1-thread-3進入,當前已有3個並發 線程pool-1-thread-3即將離開 線程pool-1-thread-4進入,當前已有3個並發 線程pool-1-thread-3已離開,當前已有3個並發 線程pool-1-thread-1即將離開 線程pool-1-thread-1已離開,當前已有2個並發 線程pool-1-thread-5進入,當前已有3個並發 線程pool-1-thread-5即將離開 線程pool-1-thread-5已離開,當前已有2個並發 線程pool-1-thread-6進入,當前已有3個並發 線程pool-1-thread-4即將離開 線程pool-1-thread-4已離開,當前已有2個並發 線程pool-1-thread-7進入,當前已有3個並發 線程pool-1-thread-2即將離開 線程pool-1-thread-2已離開,當前已有2個並發 線程pool-1-thread-7即將離開 線程pool-1-thread-7已離開,當前已有1個並發 線程pool-1-thread-6即將離開 線程pool-1-thread-6已離開,當前已有0個並發
3.acquire解析
acquire() 從該信號量獲取許可證,阻止直到可用,或線程為 interrupted 。 void acquire(int permits) 從該信號量獲取給定數量的許可證,阻止直到所有可用,否則線程為 interrupted 。
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);//調用AQS的acquireSharedInterruptibly } /** * AQS的acquireSharedInterruptibly * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * 以共享模式獲取,如果中斷被中止。 * 實現首先檢查中斷狀態,然後至少調用一次tryacquirered,成功返回。 * 否則,線程排隊,可能會重複阻塞和取消阻塞, * 調用tryacquiremred直到成功或線程被打斷了。 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//中斷拋出異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//獲取失敗,加入同步隊列等待 doAcquireSharedInterruptibly(arg); } //由Semaphore的FairSync或NonfairSync實現,共享模式下資源可以被多個線程通知佔用,tryAcquireShared返回int類型,表示還有多少個資源可以同時被佔用,用於共享模式下傳播喚醒。 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //以共享中斷模式獲取 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//創建當前線程的節點,並且鎖的模型是共享鎖,將其添加到AQS CLH隊列的末尾 boolean failed = true; try { for (;;) {//自旋 final Node p = node.predecessor();//獲得當前節點的前驅節點 if (p == head) {//是頭節點,沒有等待節點 int r = tryAcquireShared(arg); if (r >= 0) {//獲取成功當前節點設置為頭節點並傳播【傳播指的是,同步狀態剩餘的許可數值不為0,通知後續結點繼續獲取同步狀態】 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前繼節點非head節點,沒資源獲取,將前繼節點狀態設置為SIGNAL,通過park掛起node節點的線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node);//結束該結點線程的請求 } }
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException();//數量小於0拋出異常 sync.acquireSharedInterruptibly(permits);//調用AQS的acquireSharedInterruptibly }
tryAcquireShared:
static final class FairSync extends Sync {//公平鎖獲取 protected int tryAcquireShared(int acquires) { for (;;) {//自旋 //有前驅節點,表示當前線程前面有阻塞線程,當前線程獲取失敗,先讓前節點線程獲取運行【比非公平鎖獲取多了判斷前驅節點的操作】 if (hasQueuedPredecessors()) return -1; int available = getState();//可獲取的許可證數量 int remaining = available - acquires;//剩下的許可證數量 //如果剩餘數量小於0或更新剩餘數量成功,返回剩餘數量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } static final class NonfairSync extends Sync {//非公平鎖獲取 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);//調用Semaphore的內部類Sync的nonfairTryAcquireShared } } final int nonfairTryAcquireShared(int acquires) { for (;;) {//自旋 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
4.release解析
public void release() { sync.releaseShared(1);//調用AQS的releaseShared } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//釋放同步狀態成功 doReleaseShared();//喚醒同步隊列中後繼結點的線程 return true; } return false; } protected boolean tryReleaseShared(int arg) {//由Semaphore的Sync實現 throw new UnsupportedOperationException(); } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. * 保證釋放動作(向同步等待隊列尾部)傳遞,即使沒有其他正在進行的 * 請求或釋放動作。如果頭節點的後繼節點需要喚醒,那麼執行喚醒 * 動作;如果不需要,將頭結點的等待狀態設置為PROPAGATE保證 * 喚醒傳遞。另外,為了防止過程中有新節點進入(隊列),這裡必 * 需做循環,所以,和其他unparkSuccessor方法使用方式不一樣 * 的是,如果(頭結點)等待狀態設置失敗,重新檢測。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus;//頭節點狀態 // 如果頭節點對應的線程是SIGNAL狀態,則意味着頭 //結點的後繼結點所對應的線程需要被unpark-喚醒。 if (ws == Node.SIGNAL) { // 修改頭結點對應的線程狀態設置為0。失敗的話,則繼續循環。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒頭結點h的後繼結點所對應的線程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果頭結點發生變化,則繼續循環。否則,退出循環。 if (h == head) // loop if head changed break; } } //喚醒傳入結點的後繼結點對應的線程 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //拿到後繼結點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //喚醒該線程 LockSupport.unpark(s.thread); } protected final boolean tryReleaseShared(int releases) { for (;;) {//自旋 int current = getState();//獲取當前同步狀態 int next = current + releases;//狀態+1 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//更新狀態成功返回true return true; } }
二.Exchanger
1.概念
線程可以在成對內配對和交換元素的同步點。每個線程在輸入exchange方法時提供一些對象,與合作者線程匹配,並在返回時接收其合作夥伴的對象。交換器可以被視為一個的雙向形式SynchronousQueue。交換器在諸如遺傳算法和管道設計的應用中可能是有用的。

2.用法
Exchanger<V> 泛型類型,其中 V 表示可交換的數據類型
-
V exchange(V v):等待另一個線程到達此交換點(除非當前線程被中斷),然後將給定的對象傳送給該線程,並接收該線程的對象。
-
V exchange(V v, long timeout, TimeUnit unit):等待另一個線程到達此交換點(除非當前線程被中斷或超出了指定的等待時間),然後將給定的對象傳送給該線程,並接收該線程的對象。
Exchanger<Integer> exchanger = new Exchanger<>(); ExecutorService executor = Executors.newCachedThreadPool(); Runnable run = () ->{ try { int num = new Random().nextInt(10); System.out.println(Thread.currentThread().getName()+"開始交換數據:"+num); num = exchanger.exchange(num);//交換數據並得到交換後的數據 System.out.println(Thread.currentThread().getName()+"交換數據結束後的數據:"+num); } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(run); executor.execute(run); executor.shutdown(); 結果: pool-1-thread-2開始交換數據:9 pool-1-thread-1開始交換數據:8 pool-1-thread-2交換數據結束後的數據:8 pool-1-thread-1交換數據結束後的數據:9
3.exchange源碼解析 參考下面的博客,寫的很詳細
- //blog.csdn.net/qq_31865983/article/details/105620881
- //www.xuebuyuan.com/2736097.html