Disruptor源碼解析
- 2019 年 11 月 5 日
- 筆記
轉載自微信公眾號:[Amos博客] 作者:Amos
內容目錄
juc下的隊列DisruptorDisruptor是什麼Disruptor為什麼快Disruptor核心類Sequence(序列)框架類結構關係圖Cursored 獲取當前序列值Sequenced 序列的申請及發佈SequencerAbstractSequencer 管理事件處理者序列和事件發佈者發佈序列。SingleProducerSequencer 單線程事件發佈者。next()申請序列實戰單線程生產者MultiProducerSequencer成員變量構造函數next()申請序列publish()事件發佈MultiProducerSequencer和SingleProducerSequencer區別RingBufferEventSequencerDataProviderEventSink 這個類提供了各種發佈的姿勢。RingBufferPad 用於緩存行填充RingBufferFields 這個類的邏輯比較重要,講解了event在數組中存儲位置SequenceBarrier接口 消費者使用ProcessingSequenceBarrier事件處理 EventProcessorBatchEventProcessor event模式單線程處理WorkProcessor work模式多線程處理WorkerPoolwaitStrategy 等待策略實戰多線程消費者DSL
juc下的隊列

1:從上圖可以看出,juc下的隊列基本採用加鎖方式保證線程安全。通過不加鎖的方式實現的隊列都是無界的(無法保證隊列的長度在限定的範圍)。而加鎖的方式可以實現有界隊列。在穩定性要求特別高的系統中,為了防止生產者速度過快,導致內存溢出,只能選擇有界隊列。
2:加鎖的方式通常嚴重影響性能。線程會因為競爭不到鎖而被掛起,等鎖被釋放的時候,線程又會被恢復,這個過程中存在着很大的開銷,並且通常會有較長時間的中斷,因為當一個線程正在等待鎖時,它不能做任何其他事情。如果一個線程在持有鎖的情況下被延遲執行,例如發生了缺頁錯誤、調度延遲或者其它類似情況,那麼所有需要這個鎖的線程都無法執行下去。如果被阻塞線程的優先級較高,而持有鎖的線程優先級較低,就會發生優先級反轉。
3:有界隊列通常採用數組實現。但是採用數組實現又會引發另外一個問題false sharing(偽共享)。關於什麼是偽共享之前的文章已經講解。
Disruptor
Disruptor是什麼
1:Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級)
2:Disruptor實現對了隊列的功能並且是一個有界隊列。可以用於生產者-消費者模型。
Disruptor為什麼快
1:數據結構採用ringbuffer。其實可以理解成一個數組entries。每一個slot存儲一個事件對象。初始化時,就已經分配好內存,而且新發佈的數據只會覆蓋,所以更少的GC。
2:Disruptor採用緩存行填充機制的形式解決了fasle sharing。保證讀取變量的時候從cache line讀取。
3:Disroptor中維護了一個long類型的sequence(序列)。每次根據位運算操作可以快速定位到實際slot,sequece&(entries.length-1)=index,比如一共有4槽,9&(8-1)=1。提示:隊列的大小必須要2^n。
4:線程同時訪問,由於他們都通過sequence訪問ringBuffer,通過CAS取代了加鎖,這也是並發編程的原則:把同步塊最小化到一個變量上。這個sequence一直採用自增的形式。
Disruptor核心類
1:RingBuffer:Disruptor最主要的組件,僅僅負責存儲和更新事件對象。
2:Sequence:Disruptor使用Sequence來表示一個特殊組件處理的序號。和Disruptor一樣,每一個消費者(EventProcessor)都維持着一個Sequence。大部分的並發代碼依賴這這個值。這個類維護了一個long類型的value,採用的unsafe進行的更新操作。
3:Sequencer:這是Disruptor真正的核心。實現了這個接口的兩種生產者(單生產者和多生產者)均實現了所有的並發算法,為了在生產者和消費者之間進行準確快速的數據傳遞。
4:SequenceBarrier:由Sequencer生成,並且包含了已經發佈的Sequence的引用,這些Sequence源於Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裏面獲取事件時的處理策略。(例如:當生產者太慢,消費者太快,會導致消費者獲取不到新的事件會根據該策略進行處理,默認會堵塞)
5:WaitStrategy:決定一個消費者將如何等待生產者將Event置入Disruptor的策略。用來權衡當生產者無法將新的事件放進RingBuffer時的處理策略。(例如:當生產者太快,消費者太慢,會導致生產者獲取不到新的事件槽來插入新事件,則會根據該策略進行處理,默認會堵塞)
6:Event:從生產者到消費者過程中所處理的數據單元。Disruptor中沒有代碼表示Event,因為它完全是由用戶定義的。
7:EventProcessor:主要事件循環,處理Disruptor中的Event,並且擁有消費者的Sequence。它有一個實現類是BatchEventProcessor,包含了event loop有效的實現,並且將回調到一個EventHandler接口的實現對象。
8:EventHandler:由用戶實現並且代表了Disruptor中的一個消費者的接口。
9:WorkHandler:在work模式下使用。由用戶實現並且代表了Disruptor中的多個消費者的接口。
10:WorkProcessor:確保每個sequence只被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
11:WorkerPool:一個WorkProcessor池,其中WorkProcessor將消費Sequence,所以任務可以在實現WorkHandler接口的worker之間移交
12:LifecycleAware:當BatchEventProcessor啟動和停止時,實現這個接口用於接收通知。
Sequence(序列)

1:Sequence是用來標記事件發佈者和事件消費者的位置。
2:Sequence真正計數的是value,採用緩衝行填充防止false sharing。在value的前後各有7個long型的填充值,這些值在這裡的作用是做cpu cache line填充,防止發生偽共享。最壞的情況就是value位於cache line的頭或者尾。
框架類結構關係圖

Cursored 獲取當前序列值
public interface Cursored{ /** * 獲取當前序列值 */ long getCursor(); }
1:Cursored接口只提供了一個獲取當前序列值的方法。
Sequenced 序列的申請及發佈
public interface Sequenced{ //獲取隊列的大小 int getBufferSize(); //判斷隊列中是否還有可用的容量 boolean hasAvailableCapacity(final int requiredCapacity); //獲取隊列中剩餘的有效容量 long remainingCapacity(); //申請下一個sequence,用於事件發佈者發佈數據,申請失敗則自旋 long next(); //申請n個sequence,用於事件發佈者發佈數據,申請失敗則自旋 long next(int n); //嘗試獲取一個sequence long tryNext() throws InsufficientCapacityException; //嘗試獲取n個sequence long tryNext(int n) throws InsufficientCapacityException; //發佈sequence void publish(long sequence); //批量發佈sequence void publish(long lo, long hi); }
Sequencer
public interface Sequencer extends Cursored, Sequenced{ //游標初始值 long INITIAL_CURSOR_VALUE = -1L; //初始化RingBuffer為指定的sequence void claim(long sequence); //消費者調用,判斷sequence是否可以消費 boolean isAvailable(long sequence); //將sequence添加到gating sequences中 void addGatingSequences(Sequence... gatingSequences); //從gating sequences中移除指定的sequence boolean removeGatingSequence(Sequence sequence); //事件處理者用來追蹤ringBuffer中可以用的sequence SequenceBarrier newBarrier(Sequence... sequencesToTrack); //事件發佈者獲取gating sequence中最小的sequence long getMinimumSequence(); //消費者用來獲取從nextSequence到availableSequence之間最大的sequence。如果是多線程生產者判斷nextSequence是否可用,否則返回nextSequence-1。單線程直接返回availableSequence long getHighestPublishedSequence(long nextSequence, long availableSequence); //我也不知道幹啥的 <T> EventPoller<T> newPoller(DataProvider<T> provider,Sequence... gatingSequences); }
1:Sequencer中的方法大多是給事件發佈者使用。newBarrier()給事件處理者使用。
AbstractSequencer 管理事件處理者序列和事件發佈者發佈序列。
public abstract class AbstractSequencer implements Sequencer { //用來對gatingSequences做原子操作的。Sequence[]裏面存儲的是事件處理者處理到的序列。 //如果不懂AtomicReferenceFieldUpdater請www.google.com private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //隊列大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //事件發佈者的已經發佈到的sequence protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //事件處理者處理到的序列對象 protected volatile Sequence[] gatingSequences = new Sequence[0]; /** *檢查隊列大小是否是2^n,判斷buffersize大小 */ public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1");} if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } /** * 獲取事件發佈者的序列 */ @Override public final long getCursor() { return cursor.get(); } /** * 獲取大小 */ @Override public final int getBufferSize() { return bufferSize; } /** * 把事件消費者序列維護到gating sequence */ @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } /** * 從gating sequence移除序列 */ @Override public boolean removeGatingSequence(Sequence sequence) { return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence); } /** * 獲取gating sequence中事件處理者處理到最小的序列值 */ @Override public long getMinimumSequence() { return Util.getMinimumSequence(gatingSequences, cursor.get()); } /** * 創建了一個序列柵欄 */ @Override public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } /** * 這個方法不解釋,我也不知道目前用來幹嘛的。有知道的大佬可以賜教一下。謝謝 */ @Override public <T> EventPoller<T> newPoller(DataProvider<T> dataProvider, Sequence... gatingSequences) { return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences); } //重寫toString @Override public String toString() { return "AbstractSequencer{" + "waitStrategy=" + waitStrategy + ", cursor=" + cursor + ", gatingSequences=" + Arrays.toString(gatingSequences) + '}'; } }
SingleProducerSequencer 單線程事件發佈者。
1:從上面的圖可以看出SingleProducerSequencer間接繼承了AbstractSequencer。
2:SingleProducerSequencerFields維護事件發佈者發佈的序列和事件處理者處理到的最小序列。
3:SingleProducerSequencerPad緩衝行填充,防止false sharing。
next()申請序列
//該方法是事件發佈者申請序列 public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } //獲取事件發佈者發佈到的序列值 long nextValue = this.nextValue; long nextSequence = nextValue + n; //wrap 代表申請的序列繞一圈以後的位置 long wrapPoint = nextSequence - bufferSize; //獲取事件處理者處理到的序列值 long cachedGatingSequence = this.cachedValue; /** 1.事件發佈者要申請的序列值大於事件處理者當前的序列值且事件發佈者要申請的序列值減去環的長度要小於事件處理者的序列值。 * 2.滿足(1),可以申請給定的序列。 * 3.不滿足(1),就需要查看一下當前事件處理者的最小的序列值(可能有多個事件處理者)。如果最小序列值大於等於 * 當前事件處理者的最小序列值大了一圈,那就不能申請了序列(申請了就會被覆蓋), * */ if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { //wrapPoint > cachedGatingSequence 代表繞一圈並且位置大於事件處理者處理到的序列 //cachedGatingSequence > nextValue 說明事件發佈者的位置位於事件處理者的屁股後面 //維護父類中事件生產者的序列 cursor.setVolatile(nextValue); long minSequence; //如果事件生產者繞一圈以後大於事件處理者的序列,那麼會在此處自旋 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } //緩存最小值 this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } //事件發佈調用的方法。喚醒阻塞的消費者 public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); }
實戰單線程生產者
public static void main(String[] args) { /** * Create a new Disruptor. * @param eventFactory 事件對象的數據 * @param ringBufferSize 數組大小,必須是2^n * @param threadFactory 線程工廠 * @param producerType 生產者策略。ProducerType.SINGLE和ProducerType.MULTI 單個生產者還是多個生產者. * @param waitStrategy 等待策略。用來平衡事件發佈者和事件處理者之間的處理效率。提供了八種策略。默認是BlockingWaitStrategy */ //初始化的邏輯大概是創建根據ProducerType初始化創造SingleProducerSequencer或MultiProducerSequencer。 //初始化Ringbuffer的時候會根據buffsiz把事件對象放入entries數組。 Disruptor<TradeBO> disruptor = new Disruptor<>(() -> new TradeBO(), 2, r -> { Thread thread = new Thread(r); thread.setName("實戰單線程生產者"); return thread; }, ProducerType.SINGLE, new BlockingWaitStrategy()); //關聯事件處理者。初始化BatchEventProcessor。把事件處理者加入gating sequence disruptor.handleEventsWith(new ConsumerA()); disruptor.handleEventsWith(new ConsumerB()); //啟動消費者線程。BatchEventProcessor間接實現了Runnable。所以這一步就是啟動線程。如果事件發佈太快,消費太慢會根據不同的waitstrategy等待。 disruptor.start(); //發佈事件 for (int i = 1; i < 10; i++) { int finalI = i; //初始化了EventTranslator。意思就是給最開始初始化的對象賦值 EventTranslator eventTranslator = (EventTranslator<TradeBO>) (event, sequence) -> { event.setId(finalI); event.setPrice((double) finalI); }; //發佈首先要申請序列,如果申請不到會自旋。 disruptor.publishEvent(eventTranslator); } disruptor.shutdown(); } class ConsumerB implements EventHandler<TradeBO> { @Override public void onEvent(TradeBO event, long sequence, boolean endOfBatch) throws Exception { System.out.println("ConsumerB id=" + event.getId() + "price=" + event.getPrice()); } } class ConsumerA implements EventHandler<TradeBO> { @Override public void onEvent(TradeBO event, long sequence, boolean endOfBatch) throws Exception { System.out.println("ConsumerB id=" + event.getId() + " price=" + event.getPrice()); } } @Data public class TradeBO { private Integer id; private Double price; }
MultiProducerSequencer
成員變量
//獲取unsafe private static final Unsafe UNSAFE = Util.getUnsafe(); //獲取int[]的偏移量 private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); //獲取元素的大小,也就是int的大小4個位元組 private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); //gatingSequenceCache是gatingSequence。用來標識事件處理者的序列 private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //availableBuffer用來追蹤每個槽的狀態 private final int[] availableBuffer; private final int indexMask; //轉了幾圈 private final int indexShift;
構造函數
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { //初始化父類 super(bufferSize, waitStrategy); //初始化availableBuffer availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); //這個邏輯是。計算availableBuffer中每個元素的偏移量 //定位數組每個值的地址就是(index * SCALE) + BASE initialiseAvailableBuffer(); } private void initialiseAvailableBuffer() { for (int i = availableBuffer.length - 1; i != 0; i--) { setAvailableBufferValue(i, -1); } setAvailableBufferValue(0, -1); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; //修改內存偏移地址為bufferAddress的值,改為flag UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
next()申請序列
public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { //獲取事件發佈者發佈序列 current = cursor.get(); //新序列位置 next = current + n; //wrap 代表申請的序列繞一圈以後的位置 long wrapPoint = next - bufferSize; //獲取事件處理者處理到的序列值 long cachedGatingSequence = gatingSequenceCache.get(); /** 1.事件發佈者要申請的序列值大於事件處理者當前的序列值且事件發佈者要申請的序列值減去環的長度要小於事件處理者的序列值。 * 2.滿足(1),可以申請給定的序列。 * 3.不滿足(1),就需要查看一下當前事件處理者的最小的序列值(可能有多個事件處理者)。如果最小序列值大於等於 * 當前事件處理者的最小序列值大了一圈,那就不能申請了序列(申請了就會被覆蓋), * */ if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { //wrapPoint > cachedGatingSequence 代表繞一圈並且位置大於事件處理者處理到的序列 //cachedGatingSequence > current 說明事件發佈者的位置位於事件處理者的屁股後面 //獲取最小的事件處理者序列 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } //賦值 gatingSequenceCache.set(gatingSequence); //通過cas修改 } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
publish()事件發佈
public void publish(final long sequence) { //這裡的操作邏輯大概是修改數組中的序列值 setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } //計算數組中位置 sequence&(buffsize-1) private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; } //計算數組中的存儲的數據 private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
MultiProducerSequencer和SingleProducerSequencer區別
1:SingleProducerSequencer內部維護cachedValue(事件消費者序列),nextValue(事件發佈者序列)。並且採用padding填充。這個類是線程不安全的。 2:MultiProducerSequencer每次獲取序列都是從Sequence中獲取的。Sequence中針對value的操作都是原子的。
RingBuffer

EventSequencer
//這個接口是一個空方法 public interface EventSequencer<T> extends DataProvider<T>, Sequenced{ }
DataProvider
//DataProvider 提供了根據序列獲取對應的對象 //有兩個地方調用。這個Event對象需要被生產者獲取往裏面填充數據。第二個是在消費時,獲取這個Event對象用於消費。 public interface DataProvider<T>{ T get(long sequence); }
EventSink 這個類提供了各種發佈的姿勢。
1:EventSink接口是用來發佈Event的,在發佈的同時,調用綁定的Translator來初始化並填充Event。
2:填充Event是通過實現EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg這些EventTranslator來做的。
3:發佈流程:申請下一個序列->申請成功則獲取對應槽的Event->利用translator初始化並填充對應槽的Event->發佈Event 。translator用戶實現,用於初始化Event。
RingBufferPad 用於緩存行填充
RingBufferFields 這個類的邏輯比較重要,講解了event在數組中存儲位置
abstract class RingBufferFields<E> extends com.lmax.disruptor.RingBufferPad { //Buffer數組填充 private static final int BUFFER_PAD; //Buffer數組起始基址 private static final long REF_ARRAY_BASE; //數組引用每個引用佔用的大小=2^REF_ELEMENT_SHIFT private static final int REF_ELEMENT_SHIFT; private static final Unsafe UNSAFE = Util.getUnsafe(); static { //獲取Object[]引用大小。我本機4位元組 final int scale = UNSAFE.arrayIndexScale(Object[].class); if (4 == scale) { REF_ELEMENT_SHIFT = 2; } else if (8 == scale) { REF_ELEMENT_SHIFT = 3; } else { throw new IllegalStateException("Unknown pointer size"); } //填充32或者16 BUFFER_PAD = 128 / scale; // 計算Buffer數組起始基址。我本機是從32開始 REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); } private final long indexMask; //保存了RingBuffer每個槽的Event對象。這個entries不會被修改。ps:引用不會被修改 private final Object[] entries; protected final int bufferSize; //sequencer=SingleProducerSequencer or MultiProducerSequencer的引用 protected final Sequencer sequencer; RingBufferFields( EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } //填充entries private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } }
SequenceBarrier接口 消費者使用
public interface SequenceBarrier { /** * 等待一個序列變為可用,然後消費這個序列。消費線程中使用 */ long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException; /** * 獲取當前可以讀取的序列值。 */ long getCursor(); /** * 當前柵欄是否發過通知。 */ boolean isAlerted(); /** * 通知消費者狀態變化,然後停留在這個狀態上,直到狀態被清除。 */ void alert(); /** * 清楚通知狀態。 */ void clearAlert(); /** * 檢測是否發生了通知,如果已經發生了拋出AlertException異常。 */ void checkAlert() throws AlertException; }
ProcessingSequenceBarrier
final class ProcessingSequenceBarrier implements SequenceBarrier { //等待策略 private final WaitStrategy waitStrategy; //當消費者之前沒有依賴關係的時候,那麼dependentSequence=cursorSequence //存在依賴關係的時候,dependentSequence 里存放的是一組依賴的Sequence,get方法得到的是最小的序列值 //所謂的依賴關係是有兩個消費者A、B,其中B需要在A之後進行消費,這A的序列就是B需要依賴的序列,因為B的消費速度不能超過A。 private final Sequence dependentSequence; //判斷是否執行shutdown private volatile boolean alerted = false; //cursorSequence 代表的是寫指針。代表事件發佈者發佈到那個位置 private final Sequence cursorSequence; //sequencer=SingleProducerSequencer or MultiProducerSequencer的引用 private final Sequencer sequencer; ProcessingSequenceBarrier( final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } } @Override public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { //檢查是否中斷 checkAlert(); //根據不同的策略獲取可用的序列 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); //判斷申請的序列和可用的序列大小 if (availableSequence < sequence) { return availableSequence; } //如果是單線程生產者直接返回availableSequence //多線程生產者判斷是否可用,不可用返回sequence-1 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } //獲取當前序列 @Override public long getCursor() { return dependentSequence.get(); } //判斷是否中斷 @Override public boolean isAlerted() { return alerted; } //中斷 @Override public void alert() { alerted = true; waitStrategy.signalAllWhenBlocking(); } //清除中斷 @Override public void clearAlert() { alerted = false; } //檢查是否中斷 @Override public void checkAlert() throws AlertException { if (alerted) { throw AlertException.INSTANCE; } } }
事件處理 EventProcessor
public interface EventProcessor extends Runnable{ //獲取事件處理器使用的序列引用。 Sequence getSequence(); //中斷 void halt(); //判斷是否運行 boolean isRunning(); }
BatchEventProcessor event模式單線程處理
//重點講run方法,其它方法都比較簡單 public final class BatchEventProcessor<T> implements EventProcessor { public void run() { //啟動任務 if (running.compareAndSet(IDLE, RUNNING)) { //清除中斷狀態 sequenceBarrier.clearAlert(); //判斷一下消費者是否實現了LifecycleAware ,如果實現了這個接口,那麼此時會發送一個啟動通知 notifyStart(); try { //判斷任務是否啟動 if (running.get() == RUNNING) { //處理事件 processEvents(); } } finally { //判斷一下消費者是否實現了LifecycleAware ,如果實現了這個接口,那麼此時會發送一個停止通知 notifyShutdown(); //重新設置狀態 running.set(IDLE); } } else { // 線程已經啟動 if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { //這裡就是 notifyStart();notifyShutdown(); earlyExit(); } } } private void processEvents() { //定義一個event T event = null; //獲取要申請的序列 long nextSequence = sequence.get() + 1L; //循環處理事件。除非超時或者中斷。 while (true) { try { //根據等待策略來等待可用的序列值。 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } //根據可用的序列值獲取事件。批量處理nextSequence到availableSequence之間的事件。 while (nextSequence <= availableSequence) { //獲取事件 event = dataProvider.get(nextSequence); //觸發事件 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } //設置事件處理者處理到的序列值。事件發佈者會根據availableSequence判斷是否發佈事件 sequence.set(availableSequence); } catch (final TimeoutException e) { //超時異常 notifyTimeout(sequence.get()); } catch (final AlertException ex) { //中斷異常 if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { //這裡可能用戶消費者事件出錯。如果自己實現了ExceptionHandler那麼就不會影響繼續消費 exceptionHandler.handleEventException(ex, nextSequence, event); //如果出現異常則設置為nextSequence sequence.set(nextSequence); nextSequence++; } } }
WorkProcessor work模式多線程處理
public void run() { //判斷線程是否啟動 if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } //清除中斷狀態 sequenceBarrier.clearAlert(); //判斷一下消費者是否實現了LifecycleAware ,如果實現了這個接口,那麼此時會發送一個啟動通知 notifyStart(); //事件處理標誌 boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { //判斷上一個事件是否已經處理完畢。 if (processedSequence) { //置為false processedSequence = false; do { //獲取下一個序列 nextSequence = workSequence.get() + 1L; //更新當前已經處理到的 sequence.set(nextSequence - 1L); } //多個WorkProcessor共享一個workSequence,可以實現互斥消費,因為只有一個線程可以CAS更新成功 while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } //檢查序列值是否需要申請。 if (cachedAvailableSequence >= nextSequence) { //獲取事件 event = ringBuffer.get(nextSequence); //交給workHandler處理事件。 workHandler.onEvent(event); //設置事件處理完成標識 processedSequence = true; } else { //申請可用序列 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { //設置異常事件處理 exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } //同上 notifyShutdown(); //停止 running.set(false); }
WorkerPool
1:多個WorkProcessor組成一個WorkerPool。 2:維護workSequence事件處理者處理的序列。
waitStrategy 等待策略
BlockingWaitStrategy:默認的等待策略。利用鎖和等待機制的WaitStrategy,CPU消耗少,但是延遲比較高
BusySpinWaitStrategy:自旋等待。這種策略會利用CPU資源來避免系統調用帶來的延遲抖動,當線程可以綁定到指定CPU(核)的時候可以使用這個策略。
LiteBlockingWaitStrategy:實現方法也是阻塞等待
SleepingWaitStrategy:是另一種較為平衡CPU消耗與延遲的WaitStrategy,在不同次數的重試後,採用不同的策略選擇繼續嘗試或者讓出CPU或者sleep。這種策略延遲不均勻。
TimeoutBlockingWaitStrategy:實現方法是阻塞給定的時間,超過時間的話會拋出超時異常。
YieldingWaitStrategy:實現方法是先自旋(100次),不行再臨時讓出調度(yield)。和SleepingWaitStrategy一樣也是一種高性能與CPU資源之間取捨的折中方案,但這個策略不會帶來顯著的延遲抖動。
PhasedBackoffWaitStrategy:實現方法是先自旋(10000次),不行再臨時讓出調度(yield),不行再使用其他的策略進行等待。可以根據具體場景自行設置自旋時間、yield時間和備用等待策略。
實戰多線程消費者
public static void main(String[] args) { //創建一個RingBuffer,注意容量是2。 RingBuffer<TradeBO> ringBuffer = RingBuffer.createSingleProducer(() -> new TradeBO(), 2); //創建2個WorkHandler其實就是創建2個WorkProcessor WorkerPool<TradeBO> workerPool = new WorkerPool<TradeBO>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), new ConsumerC(), new ConsumerD()); //將WorkPool的工作序列集設置為ringBuffer的追蹤序列。 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //創建一個線程池用於執行Workhandler。 Executor executor = Executors.newFixedThreadPool(4); //啟動WorkPool。 workerPool.start(executor); //往RingBuffer上發佈事件 for (int i = 0; i < 4; i++) { int finalI = i; EventTranslator eventTranslator = (EventTranslator<TradeBO>) (event, sequence) -> { event.setId(finalI); event.setPrice((double) finalI); }; ringBuffer.publishEvent(eventTranslator); System.out.println("發佈[" + finalI + "]"); } } //程序執行結果。可以看出,多個線程消費者處理位於不同位置的事件 發佈[0] ConsumerC id=0 price=0.0 發佈[1] 發佈[2] ConsumerC id=2 price=2.0 ConsumerD id=1 price=1.0 ConsumerC id=3 price=3.0 發佈[3]
DSL
1:所謂DSL我的理解就是消費者這裡相互依賴。

dw.consumeWith(handler1a, handler2a); dw.after(handler1a).consumeWith(handler1b); dw.after(handler2a).consumeWith(handler2b); dw.after(handler1b, handler2b).consumeWith(handler3); ProducerBarrier producerBarrier = dw.createProducerBarrier();