啥?SynchronousQueue和鐘點房一個道理
- 2021 年 5 月 24 日
- 筆記
今天這篇文章,我們繼續講架構師大劉的故事。
大劉有段時間經常會給一些程式設計師講課。這一方面是由於團隊培訓的需要,一方面也是大劉自身想搞搞凡爾賽,嘚瑟一下自身的實力。
大劉講課是允許公司任何一個人進去聽的。提前一個星期把主題公布在公司群里,有人想聽到日子直接去就是了。
有一次,大劉在聊並發話題的時候,為了彰顯自己確實是個並發達人,用了個 SynchronousQueue 舉例子。他說這個隊列其實沒有容積的概念,就是執行緒持有數據互相匹配。
嗯,談到這裡還是要說一下,大劉其實也不太懂 SynchronousQueue。只是一來這東西沒人用,自然就沒人懂;二來它的概念也比較晦澀,有些時候比較違背直覺,所以,即使隨口說的一些話可能不太對,也未必會被發現,還能給人一種不明覺厲的感覺。
大劉用過幾次,感覺良好。因此沒事兒就要秀一下 SynchronousQueue,表示自己這麼生僻的也懂,並發達人的名頭是沒有叫錯的。
也就那一次,恰恰被人拆了台。
當時課上來了個新入職的技術,此人長得中等身材,相貌平平,只是臉卻長的像種地多年的老農的巴掌。臉上的疙瘩如同老農巴掌上的老繭。這人姓張,這裡由於他臉長得像個大巴掌,那就暫且叫他巴掌張。
這個巴掌張打斷了大劉的話,言之鑿鑿說大劉說的是錯的,說他看過這個 SynchronousQueue,並不是大劉說的這樣。
大劉有點心虛,脖子滲出了一圈汗,但是並發達人的稱呼大劉並不想丟掉。於是說了一大堆雲里霧裡的廢話,把話題帶偏了開去。並告訴巴掌張,下回要和他在這個舞台上 PK 一二, 要好好看看誰是真正的 SynchronousQueue 的知心朋友。
由於大劉感覺被巴掌張的巴掌糊了臉,便就此下了決心要研究透 SynchronousQueue。
Google 和百度一起查,東西合璧,洋為中用,搞了好是一陣子。最後有個犄角旮旯的小破網站,有人說了這麼一句話:
SynchronousQueue 的目的就是為了接頭,為了匹配,當接上頭了就雙方合作愉快,整個工作完成。但是一旦在接頭中,任何一方還沒到達,那麼另一方就必須阻塞著等待。
這句話一下子就敲開了大劉的腦殼,讓聰明的智商重新佔領了高地。
為啥這句話就點亮了大劉那本來已經像燈泡的腦袋了呢?因為大劉想起了他每次的面試經歷,就和這個接頭是一樣的。
大劉每次去面試,都很規矩的提前趕到新公司。但是大部分情況,時間到了之後都需要等很長時間才開始面試。大劉那時候也年輕,只是以為領導忙,所以倒也恭恭敬敬的等著。
直到大劉自己當了領導,去面試別人的時候,被 HR 委婉的提醒了下,要讓候選人等一會兒再過去,顯的公司業務很忙,讓候選人對公司保持一定的敬畏。那時候,大劉才知道這是一種 PUA 術……
大劉對照著自己的面試經歷,一下就理解了 SynchronousQueue 的概念。
SynchronousQueue 本身是為了交接、匹配而存在的。當一個執行緒往 SynchronousQueue 放東西,發現沒執行緒在等著拿,就給阻塞掉——這就像面試者來早了等面試官。
當一個執行緒去 SynchronousQueue 拿東西,發現沒東西,就去等的時候——就像面試官來早了等面試者。
搞懂 SynchronousQueue 的時候,正是一個冬天,屋外面的寒風在虎虎生威,屋裡面的大劉在熠熠生輝。
只是一個堂而皇之擺在 JDK 底層並發包中的隊列結構,SynchronousQueue 當然沒那麼簡單,裡面還存在著億點點細節。
所以,大劉在整體方向搞懂之後,開始研究起了細節。他要奮發,狠狠把巴掌張的囂張氣焰壓下去,大劉要當公司技術的頭牌。
回到現實里,SynchronousQueue 真正的目的就是為了讓兩個執行緒的工作結果進行交接。這沒什麼問題。但是,在這個交接中是需要嚴格保密的,沒有人可以窺視。
嗯,沒錯,就和你約了女朋友去鐘點房那樣的不能被窺視。
好,圍繞這個 SynchronousQueue 的鐘點房,咱們通過源程式碼,來看這億點點細節。
首先,鐘點房嚴格保密,裡面是多少人,就不能讓人知道。所以,就不能讓別人通過方法得到具體的數據。對於 SynchronousQueue 來說,自然就是通過 size() 你得不到什麼資訊。
/**
* Always returns zero.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return zero
*/
public int size() {
return 0;
}
/**
* Always returns {@code true}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return {@code true}
*/
public boolean isEmpty() {
return true;
}
其次,鐘點房也不能隨便進去查房,看看都是誰。所以,自然就不能迭代。
/**
* Returns an empty iterator in which {@code hasNext} always returns
* {@code false}.
*
* @return an empty iterator
*/
public Iterator<E> iterator() {
return Collections.emptyIterator();
}
再次,鐘點房保護隱私,它也不能讓你鑽了漏子,不告訴你 XXX 是不是躲在了鐘點房裡。所以,你也不能知道鐘點房裡有沒有某個人。
/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element
* @return {@code false}
*/
public boolean contains(Object o) {
return false;
}
/**
* Returns {@code false} unless the given collection is empty.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param c the collection
* @return {@code false} unless given collection is empty
*/
public boolean containsAll(Collection<?> c) {
return c.isEmpty();
}
自然,鐘點房也沒什麼權力趕人出去。
/**
* Always returns {@code false}.
* A {@code SynchronousQueue} has no internal capacity.
*
* @param o the element to remove
* @return {@code false}
*/
public boolean remove(Object o) {
return false;
}
當然,作為一個商業化的鐘點房,SynchronousQueue 還是很注意安全的,它貼心的提供了緊急轉移的手段。
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
for (E e; (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
for (E e; n < maxElements && (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
最後,鐘點房就只能搞搞交接工作了。交接嗎,自然是有交有接的,交的就得帶東西。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// put:帶著東西進屋子
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
接的肯定不會帶著東西,得留地方拿東西。
public E take() throws InterruptedException {
// take:從屋子裡把東西拿出來
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
但是呢,這交接工作啊,得在專人安排下進行。
為什麼需要專人來幫忙?因為有時候我們的鐘點房太受歡迎了,客人多,得排隊管管。管這些排隊的就是 Transfer,它是鐘點房的經理。
/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization. Since
* this is accessed only at most once per public method, there
* isn't a noticeable performance penalty for using volatile
* instead of final here.
*/
private transient volatile Transferer<E> transferer;
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
Transfer 經理每次開門營業的時候,會收到總部給的牌子,告訴他管理工作要注意方式方法,比如公平有效,比如優先服務 VIP 客人之類的。
/**
* 默認給vip客人開點後門
*/
public SynchronousQueue() {
this(false);
}
/**
* 總部遞牌子,告訴Transfer到底是公平還是不公平,
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
先看看適合勞苦大眾的公平模式,先來先享受,晚來沒折扣。
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode{...}
transient volatile QNode head;
transient volatile QNode tail;
transient volatile QNode cleanMe;
TransferQueue() {
//經典的鏈表套路,先搞個虛擬的頭結點
QNode h = new QNode(null, false);
head = h;
tail = h;
}
……
……
QNode 就是 Transfer 經理需要的牌子,上面記錄點資訊,別到時候弄錯了。
static final class QNode {
volatile QNode next; // 下一個排隊的哥們兒
volatile Object item; // 這次哥們帶來的要交接的東西
volatile Thread waiter; // 交接的執行緒
final boolean isData; // isData == true表示帶著東西
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// ...省略一系列CAS方法
}
怎麼搞,秘密都在 transfer() 里。
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
//...先省略細節
}
transfer 本質就是一直在等待交接完成或者交接被中斷,被取消,或者等待超時。
for (;;) {
QNode t = tail;
QNode h = head;
//因為初始化是在構造函數里搞得,可能構造函數沒有執行完,就被用上了,就會出現t或者h為null的情況
if (t == null || h == null)
continue; //啥也不能做
//h==t表示沒人,t.isData == isData表示過來的哥們和前面的哥們目的一樣,那就只能考慮排隊等著了。
if (h == t || t.isData == isData) {
QNode tn = t.next;
//執行緒不安全需要考慮的,現在的尾巴不對,指錯了,重新確認下
if (t != tail)
continue;
//隊尾確定了,發現又來了人,把尾巴指向新來的人
if (tn != null) {
advanceTail(t, tn);
continue;
}
//超時了,別等了
if (timed && nanos <= 0)
return null;
//總算沒事兒了,哥們可以登記進屋了
if (s == null)
s = new QNode(e, isData);
//中間可能有人插隊,只能再等等
if (!t.casNext(null, s))
continue;
//準備進屋等著約的人
advanceTail(t, s);
Object x = awaitFulfill(s, e, timed, nanos);
//同一個人出來,那就是任務失敗了
if (x == s) {
//清理下
clean(t, s);
return null;
}
if (!s.isOffList()) { //還沒脫隊
advanceHead(t, s); //排前面單獨處理
if (x != null) //交接成功設一下標記
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
這段是不是看著很頭痛?其實 Transfer 這小子也頭痛。
它首先要面臨的第一個問題:資源競爭的問題。
客人源源不斷的來,由於 Transfer 強迫症,他想每次必須從絕對的隊頭或者隊尾巴開始,所以,每次都要判斷下,到底他看到的隊頭或者隊尾,是不是真正的隊頭、隊尾。
確定沒問題了,新來的客人就開始被打造成真正的隊尾。
然後,成為隊尾的哥們就可以等著屬於自己的 Mr.Right 過來交接了。等著交接一直到成功或者失敗的方法就是 awaitFulfill(t, tn)。
這邊有人在等待,同時另外一邊,交接的人們也開始陸續過來了。
else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // 交接的核心語句
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
交接最核心的其實就是 m.casItem(x, e)。交接成功,大家各回各家了。
整體的流程如下:
-
開始就是個經典鏈表開局,head = tail
-
陸續開始有節點鏈接,put 的時候,isData = true;take 的時候,isData = false
-
可能會同時有很多的 put 操作,沒有對應的 take 操作,他們就按照次序一個個鏈接起來,形成鏈表,並通過 awaitFulfill 方法等著對應的 take
-
也可能同時會有很多的 take 操作,而沒有對應的 put 操作,會形成鏈表,並通過 awaitFulfill 方法等著對應的 put
-
take 操作會從鏈表頭開始找匹配的 put,然後通過 casItem 方法交接
-
put 操作會從鏈表頭開始找匹配的 take,然後通過 casItem 方法交接
所以,SynchronousQueue 你可以看到了,專門就是搞交接任務。
- put 的哥們發現沒人 take,就等在那裡,等著take操作。
- take的哥們兒發現沒人put,也會等在那裡,等著put操作。
這就是我們的 SynchronousQueue 鐘點房做的事情。
OK,鐘點房既然開門做生意,它也要賺錢的嘛。所以,它還得搞搞 VIP 客戶收費,也得為 VIP 客戶搞一些優待。
對於這些 VIP 客人,我們的 Transfer 經理會特意安排下,以棧的形式來安排客人,越後來的客人越大牌兒。所以,自然是後來的客人會優先搞定交接了。這裡簡短的介紹下,就不再贅述了。
Transfer 化身成 TransferStack,後來的優先服務。
-
開始自然是鏈表開局,一個無意義的鏈表頭指向了 null
-
發現鏈表是空了,二話不說,客官,您進來先啦
-
和 TransferQueue 一樣,如果都是 take 過來,模式就是 REQUEST,就得排隊了
-
交接人出現,哥們可以收攤兒了
-
其餘的不說了,一樣的,說多了沒勁
話說,大劉搞清楚了這些細節之後,次日,當巴掌張再次進行挑釁時,大劉徹底穩下來了。
當挨個把細節講的一清二楚之後,看著巴掌張那張落寞的巴掌臉,瞬間也不覺得像巴掌了,而是像是在猜拳中出的石頭剪刀布中的布。大劉沒忍住,對著這個布比划出了個剪刀,光榮的結束了戰鬥。
大劉依然在技術流中獨佔鰲頭。
我們下篇大劉的故事見。
你好,我是四猿外。
一家上市公司的技術總監,管理的技術團隊一百餘人。
我從一名非電腦專業的畢業生,轉行到程式設計師,一路打拚,一路成長。
我會通過公眾號,
把自己的成長故事寫成文章,
把枯燥的技術文章寫成故事。
我建了一個讀者交流群,裡面大部分是程式設計師,一起聊技術、工作、八卦。歡迎加我微信,拉你入群。