源碼分析:Exchanger之數據交換器

簡介

Exchanger是Java5 開始引入的一個類,它允許兩個線程之間交換持有的數據。當Exchanger在一個線程中調用exchange方法之後,會阻塞等待另一個線程調用同樣的exchange方法,然後以線程安全的方式交換數據,之後線程繼續執行。

官方示例

在JDK的源碼注釋中,提供了一個簡單的示例demo,稍加修改後就可以運行

public class FillAndEmpty {
    Exchanger<Integer> exchanger = new Exchanger<Integer>();
    Integer initialEmptyBuffer = 1;
    Integer initialFullBuffer = 2;

     class FillingLoop implements Runnable {
        public void run() {
            Integer currentBuffer = initialEmptyBuffer;
            try {
                while (currentBuffer != 2) {
                     currentBuffer = exchanger.exchange(currentBuffer);
                }
                System.out.println("FillingLoop:"+currentBuffer);
            } catch (InterruptedException ex) {

            }
        }
    }

     class EmptyingLoop implements Runnable {
        public void run() {
            Integer currentBuffer = initialFullBuffer;
            try {
                while (currentBuffer != 1) {
                    currentBuffer = exchanger.exchange(currentBuffer);
                }
                System.out.println("EmptyingLoop:"+currentBuffer);
            } catch (InterruptedException ex) {

            }
        }
    }

    void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
    }

    public static void main(String[] args){
        FillAndEmpty f = new FillAndEmpty();
        f.start();
    }
}

源碼分析

內部類

Exchanger 中定義了兩個內部類:Node、Participant

// 使用 @sun.misc.Contended 註解避免出現偽共享
@sun.misc.Contended static final class Node {
    int index;              // Arena 中的索引
    int bound;              // Exchanger.bound的最後記錄值
    int collides;           // 當前 bound 的CAS 失敗數
    int hash;               // Pseudo-random for spins
    Object item;            // 線程的當前數據項
    volatile Object match;  // 由釋放線程提供的項目
    volatile Thread parked; // 當阻塞(parked)時,設置此線程,否則為null
}
/** 繼承了ThreadLocal,並初始化了Node對象 */
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}

重要的屬性

/** 每個線程的狀態 */
private final Participant participant;
/** 消除數組;在啟用(在slotExchange中)之前為空。元素訪問使用volatile get和CAS */
private volatile Node[] arena;
/** 在檢測到爭用之前一直使用的插槽,可以理解為先到的線程的數據項 */
private volatile Node slot;
/** 每次更新時,將最大有效競技場位置的索引與高位SEQ號進行「或」運算。 */
private volatile int bound;

exchange()方法

等待另一個線程到達交換點(除非當前線程被中斷),然後將給定的對象傳遞給它,作為回報接收另一個的對象。

public V exchange(V x) throws InterruptedException {
    // 交換後的對象v
    Object v; 
    // item 為交換出去的對象,如果為null則換成NULL_ITEM對象
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    // 1.1構造方法沒有初始化arena,所以第一個進來的線程看見的arena肯定為null
    // 1.2第一個進來的線程繼續調用slotExchange(item, false, 0L)方法
    if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
        // 2.1 Thread.interrupted(): 檢測線程是否有被中斷
        // 2.2 arenaExchange(item, false, 0L):slotExchange方法 返回了null時會進入到這個方法
        ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V)v;
}

arenaExchange()方法總結:

  1. 調用exchange方法的線程等待另一個線程到達交換點完成交換數據
  2. 如果交換的數據為null,會被轉換成一個NULL_ITEM 的Object對象作為轉換的數據項
  3. 構造方法未初始化arena對象,所以會先調用slotExchange方法借用slot插槽來交換對象
  4. 如果slotExchange方法成功返回了另一個交換到的對象,則直接返回交換到的數據項
  5. 如果slotExchange方法成功返回了null,會繼續調用arenaExchange方法完成數據交換並返回

slotExchange()方法

/**
 * item:要交換的項目
 * timed:是否有設置超時
 * ns: 設置的超時時間
 * return: 返回另一個線程的數據項;如果啟用arena或線程在完成之前被中斷,則為null;如果超時,則為TIMED_OUT
 */
private final Object slotExchange(Object item, boolean timed, long ns) {
    // 獲取當前線程node節點對象
    Node p = participant.get();
    Thread t = Thread.currentThread(); // 當前線程
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
    // 自旋
    for (Node q;;) {
        if ((q = slot) != null) { // 兩個線程先到的線程,slot肯定為null,一般後到的線程會進入到這個if分支
            // 如果在當前線程之前已經有線程調用了exchange方法,slot就肯定不為null,條件成立
            if (U.compareAndSwapObject(this, SLOT, q, null)) {// 後來的線程會調用CAS吧slot再置為null
                // q.item 是較早的線程的數據項
                Object v = q.item;
                // item 是當前線程的數據項;by: //jinglingwang.cn
                q.match = item;
                // 之前阻塞(park)的線程
                Thread w = q.parked;
                if (w != null) //可能另一個線程還在自旋,沒有阻塞,所以這裡可能會為null
                    // 喚醒之前被阻塞的線程
                    U.unpark(w);
                // 返回之前的線程的數據項
                return v;
            }
            // create arena on contention, but continue until slot null
            // 上面CAS修改slot失敗後,會進入到這裡;//jinglingwang.cn
            // SEQ = MMASK + 1 = 256
            if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
                // if條件成立,初始化arena數組
                // 我8核的CPU,計算的length是 (4+2) << 7 == 768
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        else if (arena != null) 
             // 如果上面的if條件成立並且初始化了arena數組,會進入到arenaExchange方法
            return null; // caller must reroute to arenaExchange
        else {
            p.item = item; // p節點的item設置為當前項item
            if (U.compareAndSwapObject(this, SLOT, null, p)) // CAS 修改slot的值,修改成功退出自旋
                break;
            p.item = null; //CAS 修改失敗沒有退出自旋,重置p節點的item為null
        }
    }
    // 理論上第一個先到的線程會進入到下面,會阻塞自己,等待另一個線程的數據項到來
    // await release
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0L;  // 超時時間
    // 根據CPU的核數確定自旋的次數1024 or 1
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    while ((v = p.match) == null) { // 先到的線程 p.match 可能會為null,下面開始自旋等待另一個線程交換的數據設置到match
        if (spins > 0) { **// 至少先自旋 1024 次,等待match數據項,自旋後才阻塞自己**
            h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
            if (h == 0)
                h = SPINS | (int)t.getId(); // 重新計算hash
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                // 減少自旋次數
                Thread.yield(); // 讓出CPU的使用權
        } else if (slot != p) // 上面自旋次數已經減到0了,並且slot != p,沒有衝突的話理論上slot 應該是等於 p 的
            spins = SPINS; // 重置自旋次數
        else if (!t.isInterrupted() && arena == null &&  (!timed || (ns = end - System.nanoTime()) > 0L)) {
            U.putObject(t, BLOCKER, this);
            p.parked = t;
            if (slot == p)
                U.park(false, ns); // 調用底層阻塞最早的線程
            // 線程被喚醒了,回到上面再次判斷while自旋,p.match理論上不會是null了,p.match是後到的線程的數據項,是需要返回給當前線程的項
            p.parked = null;
            U.putObject(t, BLOCKER, null);
        } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
            // 如果線程阻塞超時了,還是沒等待要交換的數據項,會進入到這裡,返回一個TIMED_OUT 對象或null
            v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 將 當前線程p 的 match 屬性設置成 null
    U.putOrderedObject(p, MATCH, null);
    p.item = null;
    p.hash = h;
    // 返回匹配後的數據項v
    return v;
}

slotExchange()方法總結:

  1. 線程進入該方法後,會先拿到[Exchanger](//jinglingwang.cn)Participant,也就是Node數據節點p
  2. 檢查線程的狀態,是否有被中斷,如果是返回null,會進入到下面的arenaExchange方法邏輯
  3. 先調用slotExchange()方法的線程會使用CAS的方式線程安全的佔用slot插槽
  4. 然後會自旋至少1024次並不斷讓出CPU使用權,期間如果成功等待到了另外的線程的數據項(p.match != null),則直接返回交換到的數據(v = p.match
  5. 如果自旋後沒有等到交換的數據項,調用U.park阻塞當前線程,等待另一個線程的到來將其喚醒或者超時
  6. 另一個線程進入slotExchange()方法後,發現slot插槽已經被佔用(已經有線程在等它交換數據了),取出slot插槽中的item數據(第一個線程的數據),並設置自己的數據到插槽的match項,然後喚醒另一個線程,成功換反交換到的數據。
  7. 被喚醒的線程成功獲得match數據,並返回交換後的match數據

slotExchange方法返回null的2種情況:

  1. 線程被中斷,會返回null
  2. 設置了超時時間,並且時間超時,會返回TIMED_OUT
  3. 第一個線程超時了,把slot從p置為null的同事第二個線程剛好調用CAS也在把slot從q修改為null,這時候第二個線程會修改失敗,然後就會去初始化arena數組,然後第二個線程就可能返回null

arenaExchange()方法

exchange()方法實現中可以看到,只有當slotExchange()方法返回null之後才會執行到arenaExchange()方法,而線程中斷的情況是不會進入到該方法的,所以只有另一種情況,但是要進入的幾率太小了,斷點調試的話難以構造這種情況。

private final Object arenaExchange(Object item, boolean timed, long ns) {
    // 實質上就是個Node數組
    Node[] a = arena;
    // 獲取當前線程node節點對象
    Node p = participant.get();
    // p.index 訪問插槽的索引位置,初始值為0
    for (int i = p.index;;) {                      // access slot at i
        // j是原始數組偏移量 //jinglingwang.cn
        int b, m, c; long j;                       // j is raw array offset
        // ABASE:返回Node數組中第一個元素的偏移地址+128; i << ASHIFT : i<<7
        // getObjectVolatile:獲取obj對象中offset偏移地址對應的object型field的值,支持volatile load語義
        // q節點就是通過CAS獲取arena數組偏移(i + 1) *  128個地址位上的node
        Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 如果獲取到的節點不為空,並且再次吧j位置的q元素置為null
        if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 整個條件成立,代表線程獲得了交換的數據
            Object v = q.item;                     // release
            q.match = item;
            Thread w = q.parked;
            if (w != null)  // 有阻塞的線程就喚醒
                U.unpark(w);
            return v; // 返回交換的數據
        } else if (i <= (m = (b = bound) & MMASK) && q == null) {  // i 沒有越界,並且q==null
            // 把當前線程的數據賦予給p節點的item
            p.item = item;                         // offer
            if (U.compareAndSwapObject(a, j, null, p)) { // 再使用CAS的方式把p節點安全的放入到數組的j位置上
                // CAS 修改成功
                // 計算超時時間
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                Thread t = Thread.currentThread(); // wait   當前線程
                // 自旋 1024
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;  //交換的數據
                    if (v != null) {  // 交換的數據不為null,說明有其他線程把交換的數據送進來了
                        U.putOrderedObject(p, MATCH, null);
                        // 將match和item置為null
                        p.item = null;             // clear for next use
                        p.hash = h; 
                        return v;// 返回數據
                    } else if (spins > 0) {
                        // 異或移位
                        h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                        if (h == 0)                // initialize hash  初始化hash
                            h = SPINS | (int)t.getId();
                        else if (h < 0 &&          // approx 50% true
                                 (--spins & ((SPINS >>> 1) - 1)) == 0) // 減少自旋次數
                            Thread.yield();        // two yields per wait  讓出CPU使用權
                    } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的類似
                        // 重置自旋次數
                        spins = SPINS;       // releaser hasn't set match yet  
                    else if (!t.isInterrupted() && m == 0 &&
                             (!timed || // 超時時間設置
                              (ns = end - System.nanoTime()) > 0L)) {
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                        p.parked = t;              // minimize window
                        if (U.getObjectVolatile(a, j) == p)
                            U.park(false, ns);  // 阻塞當前線程,等待被喚醒
                        p.parked = null; // 線程被喚醒了
                        U.putObject(t, BLOCKER, null);
                    } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) {
                        // m會跟着bound變化,初始會是0
                        if (m != 0)                // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 修改b
                        p.item = null;
                        p.hash = h;
                        // i = p.index無符號右移1位
                        i = p.index >>>= 1;        // descend
                        if (Thread.interrupted()) //線程被中斷
                            return null;
                        if (timed && m == 0 && ns <= 0L) // 超時,返回TIME_OUT
                            return TIMED_OUT;
                        break;                     // expired; restart
                    }
                }
            } else // 使用CAS的方式把p節點安全的放入到數組的j位置上失敗(可能有其他線程已經捷足先登),重置p節點的item
                p.item = null;                     // clear offer
        } else { // 上面兩個if條件都沒成立:比如q!=null,compareAndSwapObject失敗,數組未越界
            if (p.bound != b) {                    // stale; reset
                p.bound = b; // b變化了,重置bond
                p.collides = 0; // 當前 bound 的CAS 失敗數
                i = (i != m || m == 0) ? m : m - 1; // 確定索引i
            } else if ((c = p.collides) < m || m == FULL ||  !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                p.collides = c + 1; // bound 的CAS 失敗數+1
                // 確定循環遍歷i,繼續回到上面最初的地方自旋
                i = (i == 0) ? m : i - 1;          // cyclically traverse
            } else
                // 此時表示bound值增加了SEQ+1
                i = m + 1;                         // grow
            p.index = i; // 設置下標,繼續自旋
        }
    }
}

Exchanger總結:

  1. Exchanger 可以以線程安全的方式完成兩個線程之間數據的交換工作
  2. By: //jinglingwang.cn
  3. Exchanger 主要是使用了自旋和CAS來保證數據的原子性
  4. 一般情況下,slotExchange()方法即可完成數據交換的工作
  5. JDK8 版本的Exchanger 使用了 @sun.misc.Contended註解來避免偽共享
  6. 數據交換過程可以總結為:A、B線程交換數據 ,A發現slot為空就把自己的數據放入到slot插槽中的item項,自旋或阻塞等待B線程的數據,B線程進來發現A線程的數據後取走數據並設置自己的數據到match,然後再喚醒A線程取走B線程的match數據。多個線程交換時,需要用到slot數組。