啥?SynchronousQueue和鐘點房一個道理

  • 2021 年 5 月 24 日
  • 筆記

今天這篇文章,我們繼續講架構師大劉的故事。

大劉有段時間經常會給一些程式設計師講課。這一方面是由於團隊培訓的需要,一方面也是大劉自身想搞搞凡爾賽,嘚瑟一下自身的實力。

大劉講課是允許公司任何一個人進去聽的。提前一個星期把主題公布在公司群里,有人想聽到日子直接去就是了。

有一次,大劉在聊並發話題的時候,為了彰顯自己確實是個並發達人,用了個 SynchronousQueue 舉例子。他說這個隊列其實沒有容積的概念,就是執行緒持有數據互相匹配。

嗯,談到這裡還是要說一下,大劉其實也不太懂 SynchronousQueue。只是一來這東西沒人用,自然就沒人懂;二來它的概念也比較晦澀,有些時候比較違背直覺,所以,即使隨口說的一些話可能不太對,也未必會被發現,還能給人一種不明覺厲的感覺。

大劉用過幾次,感覺良好。因此沒事兒就要秀一下 SynchronousQueue,表示自己這麼生僻的也懂,並發達人的名頭是沒有叫錯的。

也就那一次,恰恰被人拆了台。

當時課上來了個新入職的技術,此人長得中等身材,相貌平平,只是臉卻長的像種地多年的老農的巴掌。臉上的疙瘩如同老農巴掌上的老繭。這人姓張,這裡由於他臉長得像個大巴掌,那就暫且叫他巴掌張。

這個巴掌張打斷了大劉的話,言之鑿鑿說大劉說的是錯的,說他看過這個 SynchronousQueue,並不是大劉說的這樣。

大劉有點心虛,脖子滲出了一圈汗,但是並發達人的稱呼大劉並不想丟掉。於是說了一大堆雲里霧裡的廢話,把話題帶偏了開去。並告訴巴掌張,下回要和他在這個舞台上 PK 一二, 要好好看看誰是真正的 SynchronousQueue 的知心朋友。

由於大劉感覺被巴掌張的巴掌糊了臉,便就此下了決心要研究透 SynchronousQueue。

Google 和百度一起查,東西合璧,洋為中用,搞了好是一陣子。最後有個犄角旮旯的小破網站,有人說了這麼一句話:

SynchronousQueue 的目的就是為了接頭,為了匹配,當接上頭了就雙方合作愉快,整個工作完成。但是一旦在接頭中,任何一方還沒到達,那麼另一方就必須阻塞著等待。

這句話一下子就敲開了大劉的腦殼,讓聰明的智商重新佔領了高地。

為啥這句話就點亮了大劉那本來已經像燈泡的腦袋了呢?因為大劉想起了他每次的面試經歷,就和這個接頭是一樣的。

大劉每次去面試,都很規矩的提前趕到新公司。但是大部分情況,時間到了之後都需要等很長時間才開始面試。大劉那時候也年輕,只是以為領導忙,所以倒也恭恭敬敬的等著。

直到大劉自己當了領導,去面試別人的時候,被 HR 委婉的提醒了下,要讓候選人等一會兒再過去,顯的公司業務很忙,讓候選人對公司保持一定的敬畏。那時候,大劉才知道這是一種 PUA 術……

大劉對照著自己的面試經歷,一下就理解了 SynchronousQueue 的概念。

SynchronousQueue 本身是為了交接、匹配而存在的。當一個執行緒往 SynchronousQueue 放東西,發現沒執行緒在等著拿,就給阻塞掉——這就像面試者來早了等面試官。

當一個執行緒去 SynchronousQueue 拿東西,發現沒東西,就去等的時候——就像面試官來早了等面試者。

搞懂 SynchronousQueue 的時候,正是一個冬天,屋外面的寒風在虎虎生威,屋裡面的大劉在熠熠生輝。

只是一個堂而皇之擺在 JDK 底層並發包中的隊列結構,SynchronousQueue 當然沒那麼簡單,裡面還存在著億點點細節。

所以,大劉在整體方向搞懂之後,開始研究起了細節。他要奮發,狠狠把巴掌張的囂張氣焰壓下去,大劉要當公司技術的頭牌。

回到現實里,SynchronousQueue 真正的目的就是為了讓兩個執行緒的工作結果進行交接。這沒什麼問題。但是,在這個交接中是需要嚴格保密的,沒有人可以窺視。

嗯,沒錯,就和你約了女朋友去鐘點房那樣的不能被窺視。

好,圍繞這個 SynchronousQueue 的鐘點房,咱們通過源程式碼,來看這億點點細節。

首先,鐘點房嚴格保密,裡面是多少人,就不能讓人知道。所以,就不能讓別人通過方法得到具體的數據。對於 SynchronousQueue 來說,自然就是通過 size() 你得不到什麼資訊。

/**
* Always returns zero.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return zero
*/
public int size() {
  return 0;
}

/**
* Always returns {@code true}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return {@code true}
*/
public boolean isEmpty() {
  return true;
}

其次,鐘點房也不能隨便進去查房,看看都是誰。所以,自然就不能迭代。

/**
* Returns an empty iterator in which {@code hasNext} always returns
* {@code false}.
*
* @return an empty iterator
*/
public Iterator<E> iterator() {
  return Collections.emptyIterator();
}

再次,鐘點房保護隱私,它也不能讓你鑽了漏子,不告訴你 XXX 是不是躲在了鐘點房裡。所以,你也不能知道鐘點房裡有沒有某個人。

/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element
* @return {@code false}
*/
public boolean contains(Object o) {
  return false;
}

/**
* Returns {@code false} unless the given collection is empty.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param c the collection
* @return {@code false} unless given collection is empty
*/
public boolean containsAll(Collection<?> c) {
  return c.isEmpty();
}

自然,鐘點房也沒什麼權力趕人出去。

/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element to remove
* @return {@code false}
*/
public boolean remove(Object o) {
  return false;
}

當然,作為一個商業化的鐘點房,SynchronousQueue 還是很注意安全的,它貼心的提供了緊急轉移的手段。

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException            {@inheritDoc}
* @throws NullPointerException          {@inheritDoc}
* @throws IllegalArgumentException      {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
  if (c == null)
    throw new NullPointerException();
  if (c == this)
    throw new IllegalArgumentException();
  
  int n = 0;
    for (E e; (e = poll()) != null;) {
      c.add(e);
      ++n;
    }
  return n;
}

/**	
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException            {@inheritDoc}
* @throws NullPointerException          {@inheritDoc}
* @throws IllegalArgumentException      {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
  if (c == null)
    throw new NullPointerException();
  if (c == this)
    throw new IllegalArgumentException();
  
  int n = 0;
    for (E e; n < maxElements && (e = poll()) != null;) {
      c.add(e);
      ++n;
    }
  return n;
}

最後,鐘點房就只能搞搞交接工作了。交接嗎,自然是有交有接的,交的就得帶東西。

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  // put:帶著東西進屋子
  if (transferer.transfer(e, false, 0) == null) {
    Thread.interrupted();
    throw new InterruptedException();
  }
}

接的肯定不會帶著東西,得留地方拿東西。

public E take() throws InterruptedException {
  // take:從屋子裡把東西拿出來
  E e = transferer.transfer(null, false, 0);
  if (e != null)
    return e;
  Thread.interrupted();
  throw new InterruptedException();
}

但是呢,這交接工作啊,得在專人安排下進行。

為什麼需要專人來幫忙?因為有時候我們的鐘點房太受歡迎了,客人多,得排隊管管。管這些排隊的就是 Transfer,它是鐘點房的經理。

/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization.  Since
* this is accessed only at most once per public method, there
* isn't a noticeable performance penalty for using volatile
* instead of final here.
*/
private transient volatile Transferer<E> transferer;

/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
  /**
  * Performs a put or take.
  *
  * @param e if non-null, the item to be handed to a consumer;
  * if null, requests that transfer return an item
  * offered by producer.
  * @param timed if this operation should timeout
  * @param nanos the timeout, in nanoseconds
  * @return if non-null, the item provided or received; if null,
  * the operation failed due to timeout or interrupt --
  * the caller can distinguish which of these occurred
  * by checking Thread.interrupted.
  */
  abstract E transfer(E e, boolean timed, long nanos);
}

Transfer 經理每次開門營業的時候,會收到總部給的牌子,告訴他管理工作要注意方式方法,比如公平有效,比如優先服務 VIP 客人之類的。

/**
* 默認給vip客人開點後門
*/
public SynchronousQueue() {
  this(false);
}

/**
* 總部遞牌子,告訴Transfer到底是公平還是不公平,
*/
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

先看看適合勞苦大眾的公平模式,先來先享受,晚來沒折扣。

static final class TransferQueue<E> extends Transferer<E> {
  static final class QNode{...}
  transient volatile QNode head;    
  transient volatile QNode tail;
  transient volatile QNode cleanMe;
  TransferQueue() {
	//經典的鏈表套路,先搞個虛擬的頭結點
    QNode h = new QNode(null, false); 
    head = h;
    tail = h;
  }
  ……
  ……

QNode 就是 Transfer 經理需要的牌子,上面記錄點資訊,別到時候弄錯了。

static final class QNode {
  volatile QNode next; // 下一個排隊的哥們兒
  volatile Object item; // 這次哥們帶來的要交接的東西
  volatile Thread waiter; // 交接的執行緒
  final boolean isData;	// isData == true表示帶著東西

  QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
  }
	
  // ...省略一系列CAS方法
}

怎麼搞,秘密都在 transfer() 里。

@SuppressWarnings("unchecked")
  E transfer(E e, boolean timed, long nanos) {
  //...先省略細節        
}

transfer 本質就是一直在等待交接完成或者交接被中斷,被取消,或者等待超時。

for (;;) {
  QNode t = tail;
  QNode h = head;
	//因為初始化是在構造函數里搞得,可能構造函數沒有執行完,就被用上了,就會出現t或者h為null的情況
  if (t == null || h == null)         
    continue; //啥也不能做
    
	//h==t表示沒人,t.isData == isData表示過來的哥們和前面的哥們目的一樣,那就只能考慮排隊等著了。
  if (h == t || t.isData == isData) { 
    QNode tn = t.next;
    //執行緒不安全需要考慮的,現在的尾巴不對,指錯了,重新確認下
		if (t != tail)                  
      continue;
      
		//隊尾確定了,發現又來了人,把尾巴指向新來的人
    if (tn != null) {             
      advanceTail(t, tn);
      continue;
    }
		
    //超時了,別等了
    if (timed && nanos <= 0)
      return null;
      
		//總算沒事兒了,哥們可以登記進屋了
    if (s == null)
      s = new QNode(e, isData);
      
		//中間可能有人插隊,只能再等等
    if (!t.casNext(null, s))        
      continue;
				
		//準備進屋等著約的人
    advanceTail(t, s);              
    Object x = awaitFulfill(s, e, timed, nanos);
    
		//同一個人出來,那就是任務失敗了
    if (x == s) {
      //清理下                   
      clean(t, s);
      return null;
    }
    
    if (!s.isOffList()) { //還沒脫隊
      advanceHead(t, s); //排前面單獨處理
      if (x != null) //交接成功設一下標記
        s.item = s;
        s.waiter = null;
    }
    
    return (x != null) ? (E)x : e;

這段是不是看著很頭痛?其實 Transfer 這小子也頭痛。

它首先要面臨的第一個問題:資源競爭的問題。

客人源源不斷的來,由於 Transfer 強迫症,他想每次必須從絕對的隊頭或者隊尾巴開始,所以,每次都要判斷下,到底他看到的隊頭或者隊尾,是不是真正的隊頭、隊尾。

確定沒問題了,新來的客人就開始被打造成真正的隊尾。

然後,成為隊尾的哥們就可以等著屬於自己的 Mr.Right 過來交接了。等著交接一直到成功或者失敗的方法就是 awaitFulfill(t, tn)。

這邊有人在等待,同時另外一邊,交接的人們也開始陸續過來了。

else { // complementary-mode
  QNode m = h.next; // node to fulfill
  if (t != tail || m == null || h != head)
    continue; // inconsistent read

    Object x = m.item;
    if (isData == (x != null) || // m already fulfilled
      x == m || // m cancelled
      !m.casItem(x, e)) { // 交接的核心語句
        advanceHead(h, m); // dequeue and retry
        continue;
      }

  advanceHead(h, m); // successfully fulfilled
  LockSupport.unpark(m.waiter);
  return (x != null) ? (E)x : e;
}

交接最核心的其實就是 m.casItem(x, e)。交接成功,大家各回各家了。

整體的流程如下:

  1. 開始就是個經典鏈表開局,head = tail

  2. 陸續開始有節點鏈接,put 的時候,isData = true;take 的時候,isData = false

  3. 可能會同時有很多的 put 操作,沒有對應的 take 操作,他們就按照次序一個個鏈接起來,形成鏈表,並通過 awaitFulfill 方法等著對應的 take

  4. 也可能同時會有很多的 take 操作,而沒有對應的 put 操作,會形成鏈表,並通過 awaitFulfill 方法等著對應的 put

  5. take 操作會從鏈表頭開始找匹配的 put,然後通過 casItem 方法交接

  6. put 操作會從鏈表頭開始找匹配的 take,然後通過 casItem 方法交接

所以,SynchronousQueue 你可以看到了,專門就是搞交接任務。

  • put 的哥們發現沒人 take,就等在那裡,等著take操作。
  • take的哥們兒發現沒人put,也會等在那裡,等著put操作。

這就是我們的 SynchronousQueue 鐘點房做的事情。

OK,鐘點房既然開門做生意,它也要賺錢的嘛。所以,它還得搞搞 VIP 客戶收費,也得為 VIP 客戶搞一些優待。

對於這些 VIP 客人,我們的 Transfer 經理會特意安排下,以棧的形式來安排客人,越後來的客人越大牌兒。所以,自然是後來的客人會優先搞定交接了。這裡簡短的介紹下,就不再贅述了。

Transfer 化身成 TransferStack,後來的優先服務。

  1. 開始自然是鏈表開局,一個無意義的鏈表頭指向了 null

  2. 發現鏈表是空了,二話不說,客官,您進來先啦

  3. 和 TransferQueue 一樣,如果都是 take 過來,模式就是 REQUEST,就得排隊了

  4. 交接人出現,哥們可以收攤兒了

  5. 其餘的不說了,一樣的,說多了沒勁

話說,大劉搞清楚了這些細節之後,次日,當巴掌張再次進行挑釁時,大劉徹底穩下來了。

當挨個把細節講的一清二楚之後,看著巴掌張那張落寞的巴掌臉,瞬間也不覺得像巴掌了,而是像是在猜拳中出的石頭剪刀布中的布。大劉沒忍住,對著這個布比划出了個剪刀,光榮的結束了戰鬥。

大劉依然在技術流中獨佔鰲頭。

我們下篇大劉的故事見。


你好,我是四猿外。

一家上市公司的技術總監,管理的技術團隊一百餘人。

我從一名非電腦專業的畢業生,轉行到程式設計師,一路打拚,一路成長。

我會通過公眾號,
把自己的成長故事寫成文章,
把枯燥的技術文章寫成故事。

我建了一個讀者交流群,裡面大部分是程式設計師,一起聊技術、工作、八卦。歡迎加我微信,拉你入群。