Java並發編程基礎三板斧之Semaphore

引言

最近可以進行個稅申報了,還沒有申報的同學可以趕緊去試試哦。不過我反正是從上午到下午一直都沒有成功的進行申報,一進行申報
就返回「當前訪問人數過多,請稍後再試」。為什麼有些人就能夠申報成功,有些人就直接返回失敗。這很明顯申報處理資源是有限的,
只能等別人處理完了在來處理你的,你如果運氣好可能重試幾次就輪到你了,如果運氣不好可能重試一天也可能輪不到你。
我反正已經是放棄了,等到夜深人靜的時候再來試試。作為一個程式設計師我們肯定知道這是個稅申請app的限流操作,如果還有不懂什麼
是限流操作的可以參考下這個文章《高並發系統三大利器之限流》
比如個稅申報系統每台機器只最多分別只能處理1000個請求,再多的請求就會把機器打掛。如果是多餘的請求就把這些請求拒絕掉。直接給你返回一句溫馨提示:「當前訪問人數過多,請稍後再試」,如果要實現這個功能大家想想可以通過哪些方法演算法來實現。

共享鎖、獨佔鎖

學習semaphore之前我們必須要先了解下什麼是共享鎖。在上一篇文章《Java高並發編程基礎之AQS》我們介紹了公平鎖於非公平鎖的區別。

  • 共享鎖:它是允許多個執行緒同時獲取鎖,並發的訪問共享資源
  • 獨佔鎖:也有人把它叫做「獨享鎖」,它是是獨佔的,排他的,只能被一個執行緒可持有,
    當獨佔鎖已經被某個執行緒持有時,其他執行緒只能等待它被釋放後,才能去爭鎖,並且同一時刻只有一個執行緒能爭鎖成功。

什麼是Semaphore

在《Java並發編程藝術》(微信搜【java金融】回復電子書可以免費獲取PDF版本)這一書中是這麼說的:

Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,後面的車會看到紅燈,不能駛入XX馬路,但是如果前一百輛中有五輛車已經離開了XX馬路,那麼後面就允許有5輛車駛入馬路,這個例子里說的車就是執行緒,駛入馬路就表示執行緒在執行,離開馬路就表示執行緒執行完成,看見紅燈就表示執行緒被阻塞,不能執行。

  • Semaphore機制是提供給執行緒搶佔式獲取許可,所以他可以實現公平或者非公平,類似於ReentrantLock
    說了這麼多我們來個實際的例子看一看,比如我們去停車場停車,停車場總共只有5個車位,但是現在有8輛汽車來停車,剩下的3輛汽車要麼等其他汽車開走後進行停車,或者去找別的停車位?
/**
 * @author: 公眾號【Java金融】
 */
public class SemaphoreTest {
    public static void main(String[] args) throws InterruptedException {
         // 初始化五個車位
        Semaphore semaphore = new Semaphore(5);
        // 等所有車子
        final CountDownLatch latch = new CountDownLatch(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            if (i == 5) {
                Thread.sleep(1000);
                new Thread(() -> {
                    stopCarNotWait(semaphore, finalI);
                    latch.countDown();
                }).start();
                continue;
            }
            new Thread(() -> {
                stopCarWait(semaphore, finalI);
                latch.countDown();
            }).start();
        }
        latch.await();
        log("總共還剩:" + semaphore.availablePermits() + "個車位");
    }

    private static void stopCarWait(Semaphore semaphore, int finalI) {
        String format = String.format("車牌號%d", finalI);
        try {
            semaphore.acquire(1);
            log(format + "找到車位了,去停車了");
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
            log(format + "開走了");
        }
    }

    private static void stopCarNotWait(Semaphore semaphore, int finalI) {
         String format = String.format("車牌號%d", finalI);
        try {
            if (semaphore.tryAcquire()) {
                log(format + "找到車位了,去停車了");
                Thread.sleep(10000);
                log(format + "開走了");
                semaphore.release();
            } else {
                log(format + "沒有停車位了,不在這裡等了去其他地方停車去了");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void log(String content) {
        // 格式化
        DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 當前時間
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now.format(fmTime) + "  "+content);
    }
}
2021-03-01 18:54:57  車牌號0找到車位了,去停車了
2021-03-01 18:54:57  車牌號3找到車位了,去停車了
2021-03-01 18:54:57  車牌號2找到車位了,去停車了
2021-03-01 18:54:57  車牌號1找到車位了,去停車了
2021-03-01 18:54:57  車牌號4找到車位了,去停車了
2021-03-01 18:54:58  車牌號5沒有停車位了,不在這裡等了去其他地方停車去了
2021-03-01 18:55:07  車牌號7找到車位了,去停車了
2021-03-01 18:55:07  車牌號6找到車位了,去停車了
2021-03-01 18:55:07  車牌號2開走了
2021-03-01 18:55:07  車牌號0開走了
2021-03-01 18:55:07  車牌號3開走了
2021-03-01 18:55:07  車牌號4開走了
2021-03-01 18:55:07  車牌號1開走了
2021-03-01 18:55:17  車牌號7開走了
2021-03-01 18:55:17  車牌號6開走了
2021-03-01 18:55:17  總共還剩:5個車位

從輸出結果我們可以看到車牌號5這輛車看見沒有車位了,就不在這個地方傻傻的等了,而是去其他地方了,但是車牌號6車牌號7分別需要等到車庫開出兩輛車空出兩個車位後才停進去。這就體現了Semaphoreacquire 方法如果沒有獲取到憑證它就會阻塞,而tryAcquire方法如果沒有獲取到憑證不會阻塞的。

semaphore在dubbo中的應用

Dubbo中可以給Provider配置執行緒池大小來控制系統提供服務的最大並行度,默認是200

<dubbo:provider  threads="200"/>

比如我現在這個訂單系統有三個介面,分別為創單、取消訂單、修改訂單。這三個介面加起來的並發是200但是創單介面是核心介面,我想讓它多分點執行緒來執行
讓它可以有最大150個執行緒,取消訂單和修改訂單分別最大25個執行緒執行就可以了。dubbo提供了executes這一屬性來實現這個功能

<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
<dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
<dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

我們可以看看dubbo內部是如何來executes的,具體實現是在ExecuteLimitFilter這個類我們可以

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
            // 如果當前使用的執行緒數量已經大於等於設置的閾值,那麼直接拋出異常
//            if (count.getActive() >= max) {
// throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            /**
             * //manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
             
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 計數器+1
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
           // 計數器-1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

從上述程式碼我們也可以看出早期這個是沒有採用Semaphore來實現的,而是直接採用被注釋的 if (count.getActive() >= max) 這個來來實現的,由於這個count.getActive() >= max 和這個計數加1不是原子性的,所以會有問題,具體bug號可以看//github.com/apache/dubbo/pull/582後面才採用上述程式碼用Semaphore來修復非原子性問題。具體更詳細的分析可以參見程式碼的鏈接。不過現在最新版本(2.7.9)我看是採用採用自旋加上和CAS來實現的。

Semaphore

上面就是對Semaphore一個簡單的使用以及dubbo中用到的例子,說句實話Semaphore在工作中用的還是比較少的,不過面試又有可能會被問到,所以還是有必要來一起學習一下它。我們前面《Java高並發編程基礎之AQS》通過ReentrantLock 一起學習了下AQS,其實Semaphore同樣也是通過AQS來是實現的,我們可以一起來對照下獨佔鎖的方法,基本上都是有方法一一相對應的。
在這裡插入圖片描述
這裡有兩點稍微需要注意的地方:

  • 在獨佔鎖模式中,我們只有在獲取了獨佔鎖的節點釋放鎖時,才會喚醒後繼節點,因為獨佔鎖只能被一個執行緒持有,如果它還沒有被釋放,就沒有必要去喚醒它的後繼節點。
  • 在共享鎖模式下,當一個節點獲取到了共享鎖,我們在獲取成功後就可以喚醒後繼節點了,而不需要等到該節點釋放鎖的時候,這是因為共享鎖可以被多個執行緒同時持有,一個鎖獲取到了,則後繼的節點都可以直接來獲取。因此,在共享鎖模式下,在獲取鎖和釋放鎖結束時,都會喚醒後繼節點。

獲取憑證

我們同樣還是通過非公平鎖的模式來老獲取憑證
我們可以看下acquire的核心方法

  public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
    protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
   }
	
	// 主要看下這個方法,這個方法返回的值也就是tryAcquireShared返回的值,因為tryAcquireShared->nonfairTryAcquireShared
    final int nonfairTryAcquireShared(int acquires) {
          //自旋
		  for (;;) {
		       //Semaphore用AQS的state變數的值代表可用許可數
		       int available = getState();
		       //可用許可數減去本次需要獲取的許可數即為剩餘許可數
		       int remaining = available - acquires;
		       //如果剩餘許可數小於0或者CAS將當前可用許可數設置為剩餘許可數成功,則返回成功許可數
		       if (remaining < 0 ||
		           compareAndSetState(available, remaining))
		           return remaining;
		   }
  • tryAcquireShared 獲取返回許可書小於0時說明獲取許可失敗需要進入doAcquireSharedInterruptibly這個方法去休眠。
  • tryAcquireShared 獲取返回許可書小於0時說明獲取許可成功直接結束。

doAcquireSharedInterruptibly

```java
 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 獨佔鎖的acquireQueued調用的是addWaiter(Node.EXCLUSIVE),
        // 而共享鎖調用的是addWaiter(Node.SHARED),表明了該節點處於共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這個方法是不是跟我們上篇文章講的AQS的獨佔鎖的acquireQueued很像,不過獨佔鎖它是直接調用了用了setHead(node)方法,而共享鎖調用的是setHeadAndPropagate(node, r)
這個方法除了調用setHead 裡面還調用了doReleaseShared(喚醒後繼節點)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

其他的方法基本上是和ReentrantLock來實現的獨佔鎖差不多,我相信大家對源碼分析感興趣的應該也不多,其他更多細節問題還是需要自己親自動手去看源碼的。

總結

  • 當訊號量Semaphore初始化設置許可證為1 時,它也可以當作互斥鎖使用。其中0、1就相當於它的狀態,當=1時表示其他執行緒可以獲取,當=0時,排他,即其他執行緒必須要等待。
  • SemaphoreJUC包中的一個很簡單的工具類,用來實現多執行緒下對於資源的同一時刻的訪問執行緒數限制
  • Semaphore中存在一個【許可】的概念,即訪問資源之前,先要獲得許可,如果當前許可數量為0,那麼執行緒阻塞,直到獲得許可
  • Semaphore內部使用AQS實現,由抽象內部類Sync繼承了AQS。因為Semaphore天生就是共享的場景,所以其內部實際上類似於共享鎖的實現
  • 共享鎖的調用框架和獨佔鎖很相似,它們最大的不同在於獲取鎖的邏輯——共享鎖可以被多個執行緒同時持有,而獨佔鎖同一時刻只能被一個執行緒持有。
  • 由於共享鎖同一時刻可以被多個執行緒持有,因此當頭節點獲取到共享鎖時,可以立即喚醒後繼節點來爭鎖,而不必等到釋放鎖的時候。因此,共享鎖觸發喚醒後繼節點的行為可能有兩處,一處在當前節點成功獲得共享鎖後,一處在當前節點釋放共享鎖後。
  • 採用semaphore來進行限流的話會產生突刺現象

指在一定時間內的一小段時間內就用完了所有資源,後大部分時間中無資源可用。
比如在限流方法中的計算器演算法,設置1s內的最大請求數為100,在前100ms已經永遠了100個請求,則後面900ms將無法處理請求,這就是突刺現象

結束

  • 由於自己才疏學淺,難免會有紕漏,假如你發現了錯誤的地方,還望留言給我指出來,我會對其加以修正。
  • 如果你覺得文章還不錯,你的轉發、分享、讚賞、點贊、留言就是對我最大的鼓勵。
  • 感謝您的閱讀,十分歡迎並感謝您的關注。
    站在巨人的肩膀上摘蘋果:
    //segmentfault.com/a/1190000016447307