Kafka 延時隊列&重試隊列
一、延時隊列
1. 簡介
TimingWheel是kafka時間輪的實現,內部包含了⼀個TimerTaskList數組,每個數組包含了⼀些鏈表組成的TimerTaskEntry事件,每個TimerTaskList表示時間輪的某⼀格,這⼀格的時間跨度為tickMs,同⼀個TimerTaskList中的事件都是相差在⼀個tickMs跨度內的,整個時間輪的時間跨度為interval = tickMs * wheelSize,該時間輪能處理的時間範圍在cuurentTime到currentTime + interval之間的事件。
當添加⼀個時間他的超時時間⼤於整個時間輪的跨度時, expiration >= currentTime + interval,則會將該事件向上級傳遞,上級的tickMs是下級的interval,傳遞直到某⼀個時間輪滿⾜expiration < currentTime + interval,
然後計算對應位於哪⼀格,然後將事件放進去,重新設置超時時間,然後放進jdk延遲隊列
else if (expiration < currentTime + interval) {
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket)
}
SystemTimer會取出queue中的TimerTaskList,根據expiration將currentTime往前推進,然後把⾥⾯所有的事件重新放進時間輪中,因為ct推進了,所以有些事件會在第0格,表示到期了,直接返回。
else if (expiration < currentTime + tickMs) {
然後將任務提交到java線程池中處理。
服務端在處理客戶端的請求,針對不同的請求,可能不會⽴即返迴響應結果給客戶端。在處理這類請求時,服務端會為這類請求創建延遲操作對象放⼊延遲緩存隊列中。延遲緩存的數據結構類似MAP,延遲操作對象從延遲緩存隊列中完成並移除有兩種⽅式:
- 延遲操作對應的外部事件發⽣時,外部事件會嘗試完成延遲緩存中的延遲操作 。
- 如果外部事件仍然沒有完成延遲操作,超時時間達到後,會強制完成延遲的操作 。
2. 延時操作接口
DelayedOperation
接⼝表示延遲的操作對象。此接⼝的實現類包括延遲加⼊,延遲⼼跳,延遲⽣產,延遲拉取。延遲接⼝相關的⽅法:
tryComplete
:嘗試完成,外部事件發⽣時會嘗試完成延遲的操作。該⽅法返回值為true,表示可以完成延遲操作,會調⽤強制完成的⽅法(forceComplete
)。返回值為false,表示不可以完成延遲操作。forceComplete
:強制完成,兩個地⽅調⽤,嘗試完成⽅法(tryComplete
)返回true時;延遲操作超時時。run
:線程運⾏,延遲操作超時後,會調⽤線程的運⾏⽅法,只會調⽤⼀次,因為超時就會發⽣⼀次。超時後會調⽤強制完成⽅法(forceComplete
),如果返回true,會調⽤超時的回調⽅法。onComplete
:完成的回調⽅法。onExpiration
:超時的回調⽅法。
外部事件觸發完成和超時完成都會調⽤forceComplete()
,並調⽤onComplete()
。forceComplete
和onComplete
只會調⽤⼀次。多線程下⽤原⼦變量來控制只有⼀個線程會調⽤onComplete
和forceComplete
。
延遲⽣產和延遲拉取完成時的回調⽅法,嘗試完成的延遲操作
副本管理器在創建延遲操作時,會把回調⽅法傳給延遲操作對象。當延遲操作完成時,在onComplete
⽅法中會調⽤回調⽅法,返迴響應結果給客戶端。
創建延遲操作對象需要提供請求對應的元數據。延遲⽣產元數據是分區的⽣產結果;延遲拉取元數據是分區的拉取信息。
創建延遲的⽣產對象之前,將消息集寫⼊分區的主副本中,每個分區的⽣產結果會作為延遲⽣產的元數據。創建延遲的拉取對象之前,從分區的主副本中讀取消息集,但並不會使⽤分區的拉取結果作為延遲拉取的元數據,因為延遲⽣產返回給客戶端的響應結果可以直接從分區的⽣產結果中獲取,⽽延遲的拉取返回給客戶端的響應結果不能直接從分區的拉取結果中獲取。
元數據包含返回結果的條件是:從創建延遲操作對象到完成延遲操作對象,元數據的含義不變。對於延遲的⽣產,服務端寫⼊消息集到主副本返回的結果是確定的。是因為ISR中的備份副本還沒有全部發送應答給主副本,才會需要創建延遲的⽣產。服務端在處理備份副本的拉取請求時,不會改變分區的⽣產結果。最後在完成延遲⽣產的操作對象時,服務端就可以把 「創建延遲操作對象」 時傳遞給它的分區⽣產結果直接返回給⽣產者 。對應延遲的拉取,讀取了主副本的本地⽇志,但是因為消息數量不夠,才會需要創建延遲的拉取,⽽不⽤分區的拉取結果⽽是⽤分區的拉取信息作為延遲拉取的元數據,是因為在嘗試完成延遲拉取操作對象時,會再次讀取主副本的本地⽇志,這次的讀取有可能會讓消息數量達到⾜夠或者超時,從⽽完成延遲拉取操作對象。這樣創建前和完成時延遲拉取操作對象的返回結果是不同的。但是拉取信息不管讀取多少次都是⼀樣的。
延遲的⽣產的外部事件是:ISR的所有備份副本發送了拉取請求;備份副本的延遲拉取的外部事件是:追加消息集到主副本;消費者的延遲拉取的外部事件是:增加主副本的最⾼⽔位。
3. 嘗試完成延遲的生產
服務端處理⽣產者客戶端的⽣產請求,將消息集追加到對應主副本的本地⽇志後,會等待ISR中所有的備份剛本都向主副本發送應答 。⽣產請求包括多個分區的消息集,每個分區都有對應的ISR集合。當所有分區的ISR副本都向對應分區的主副本發送了應答,⽣產請求才能算完成。⽣產請求中雖然有多個分區,但是延遲的⽣產操作對象只會創建⼀個。
判斷分區的ISR副本是否都已經向主副本發送了應答,需要檢查ISR中所有備份副本的偏移量是否到了延遲⽣產元數據的指定偏移量(延遲⽣產的元數據是分區的⽣產結果中包含有追加消息集到本地⽇志返回下⼀個偏移量)。所以ISR所有副本的偏移量只要等於元數據的偏移量,就表示備份副本向主副本發送了應答。由於當備份副本向主副本發送拉取請求,服務端讀取⽇志後,會更新對應備份副本的偏移量數據。所以在具體的實現上,備份副本並不需要真正發送應答給主副本,因為主副本所在消息代理節點的分區對象已經記錄了所有副本的信息,所以嘗試完成延遲的⽣產時,根據副本的偏移量就可以判斷備份副本是否發送了應答。進⽽檢查分區是否有⾜夠的副本趕上指定偏移量,只需要判斷主副本的最⾼⽔位是否等於指定偏移量(最⾼⽔位的值會選擇ISR中所有備份副本中最⼩的偏移量來設置,最⼩的值都等於了指定偏移量,那麼就代表所有的ISR都發送了應答)。
總結:
總結:服務端創建的延遲⽣產操作對象,在嘗試完成時根據主副本的最⾼⽔位是否等於延遲⽣產操作對象中元數據的指定偏移量來判斷。具體步驟:
- 服務端處理⽣產者的⽣產請求,寫⼊消息集到Leader副本的本地⽇志。
- 服務端返回追加消息集的下⼀個偏移量,並且創建⼀個延遲⽣產操作對象。元數據為分區的⽣產結果(其中就包含下⼀個偏移量的值)
- 服務端處理備份副本的拉取請求,⾸先讀取主副本的本地⽇志。
- 服務端返回給備份副本讀取消息集,並更新備份副本的偏移量。
- 選擇ISR備份副本中最⼩的偏移量更新主副本的最⾼⽔位。
- 如果主副本的最⾼⽔位等於指定的下⼀個偏移量的值,就完成延遲的⽣產。
4. 嘗試完成延遲的拉取
服務端處理消費者或備份副本的拉取請求,如果創建了延遲的拉取操作對象,⼀般都是客戶端的消費進度能夠⼀直趕上主副本。⽐如備份副本同步主副本的數據,備份副本如果⼀直能趕上主副本,那麼主副本有新消息寫⼊,備份副本就會⻢上同步。但是針對備份副本已經消費到主副本的最新位置,⽽主副本並沒有新消息寫⼊時:服務端沒有⽴即返回空的拉取結果給備份副本,這時會創建⼀個延遲的拉取操作對象,如果有新的消息寫⼊,服務端會等到收集⾜夠的消息集後,才返回拉取結果給備份副本,有新的消息寫⼊,但是還沒有收集到⾜夠的消息集,等到延遲操作對象超時後,服務端會讀取新寫⼊主副本的消息後,返回拉取結果給備份副本(完成延遲的拉取時,服務端還會再讀取⼀次主副本的本地⽇志,返回新讀取出來的消息集)。
客戶端的拉取請求包含多個分區,服務端判斷拉取的消息⼤⼩時,會收集拉取請求涉及的所有分區。只要消息的總⼤⼩超過拉取請求設置的最少位元組數,就會調⽤forceComplete()⽅法完成延遲的拉取。
外部事件嘗試完成延遲的⽣產和拉取操作時的判斷條件:
拉取偏移量是指拉取到消息⼤⼩。對於備份副本的延遲拉取,主副本的結束偏移量是它的最新偏移量(LEO)。對於消費者的拉取延遲,主副本的結束偏移量是它的最⾼⽔位(HW)。備份副本要時刻與主副本同步,消費者只能消費到主副本的最⾼⽔位。
5. ⽣產請求和拉取請求的延遲緩存
客戶端的⼀個請求包括多個分區,服務端為每個請求都會創建⼀個延遲操作對象。⽽不是為每個分區創建⼀個延遲操作對象。服務端的「延遲操作緩存」管理了所有的「延遲操作對象」,緩存的鍵是每⼀個分區,緩存的值是分區對應的延遲操作列表。
⼀個客戶端請求對應⼀個延遲操作,⼀個延遲操作對應多個分區。在延遲緩存中,⼀個分區對應多個延遲操作。延遲緩存中保存了分區到延遲操作的映射關係。
根據分區嘗試完成延遲的操作,因為⽣產者和消費者是以分區為最⼩單位來追加消息和消費消息。雖然延遲操作的創建是針對⼀個請求,但是⼀個請求中會有多個分區,在⽣產者追加消息時,⼀個⽣產請求總的不同分區包含的消息是不⼀樣的。這樣追加到分區對應的主副本的本地⽇志中,有的分區就可以去完成延遲的拉取,但是有的分區有可能還達不到完成延遲拉取操作的條件。同樣完成延遲的⽣產也⼀樣。所以在延遲緩存中要以分區為鍵來存儲各個延遲操作。
由於⼀個請求創建⼀個延遲操作,⼀個請求⼜會包含多個分區,所以不同的延遲操作可能會有相同的分區。在加⼊到延遲緩存時,每個分區都對應相同的延遲操作。外部事件發⽣時,服務端會以分區為粒度,嘗試完成這個分區中的所有延遲操作 。 如果指定分區對應的某個延遲操作可以被完成,那麼延遲操作會從這個分區的延遲操作列表中移除。但這個延遲操作還有其他分區,其他分區中已經被完成的延遲操作也需要從延遲緩存中刪除。但是不會⽴即被刪除,因為分區作為延遲緩存的鍵,在服務端的數量會很多。只要分區對應的延遲操作完成了⼀個,就要⽴即檢查所有分區,對服務端的性能影響⽐較⼤。所以采⽤⼀個清理器,會負責定時地清理所有分區中已經完成的延遲操作。
副本管理器針對⽣產請求和拉取請求都分別有⼀個全局的延遲緩存。⽣產請求對應延遲緩存中存儲了延遲的⽣產。拉取請求對應延遲緩存中存儲了延遲的拉取。
延遲緩存提供了兩個⽅法:
- tryCompleteElseWatch():嘗試完成延遲的操作,如果不能完成,將延遲操作加⼊延遲緩存中。⼀旦將延遲操作加⼊延遲緩存的監控,延遲操作的每個分區都會監視該延遲操作。換句話說就是每個分區發⽣了外部事件後,都會去嘗試完成延遲操作。
- checkAndComplete():參數是延遲緩存的鍵,外部事件調⽤該⽅法,根據指定的鍵嘗試完成延遲緩存中的延遲操作。
延遲緩存在調⽤tryCompleteElseWatch⽅法將延遲操作加⼊延遲緩存之前,會先嘗試⼀次完成延遲的操作,如果不能完成,會調⽤⽅法將延遲操作加⼊到分區對應的監視器,之後還會嘗試完成⼀次延遲操作,如果還不能完成,會將延遲操作加⼊定時器。如果前⾯的加⼊過程中,可以完成延遲操作後,那麼就可以不⽤加⼊到其他分區的延遲緩存了。
延遲操作不僅存在於延遲緩存中,還會被定時器監控。定時器的⽬的是在延遲操作超時後,服務端可以強制完成延遲操作返回結果給客戶端。延遲緩存的⽬的是讓外部事件去嘗試完成延遲操作。
6. 監視器
延遲緩存的每個鍵都有⼀個監視器(類似每個分區有⼀個監視器),以鏈表結構來管理延遲操作。當外部事件發⽣時,會根據給定的鍵,調⽤這個鍵的對應監視器的tryCompleteWatch()⽅法,嘗試完成監視器中所有的延遲操作。監視器嘗試完成所有延遲操作的過程中,會調⽤每個延遲操作的tryComplete()⽅法,判斷能否完成延遲的操作。如果能夠完成,就從鏈表中刪除對應的延遲操作。
7. 清理線程
清理線程的作⽤是清理所有監視器中已經完成的延遲操作。
8. 定時器
服務端創建的延遲操作會作為⼀個定時任務,加⼊定時器的延遲隊列中。當延遲操作超時後,定時器會將延遲操作從延遲隊列中彈出,並調⽤延遲操作的運⾏⽅法,強制完成延遲的操作。
定時器使⽤延遲隊列管理服務端創建的所有延遲操作,延遲隊列的每個元素是定時任務列表,⼀個定時任務列表可以存放多個定時任務條⽬。服務端創建的延遲操作對象,會先包裝成定時任務條⽬,然後加⼊延遲隊列指定的⼀個定時任務列表。延遲隊列是定時器中保存定時任務列表的全局數據結構,服務端創建的延遲操作不是直接加⼊定時任務列表,⽽是加⼊時間輪。
時間輪和延遲隊列的關係:
- 定時器擁有⼀個全局的延遲隊列和時間輪,所有時間輪公⽤⼀個計數器。
- 時間輪持有延遲隊列的引⽤。
- 定時任務條⽬添加到時間輪對應的時間格(槽)(槽中是定時任務列表)中,並且把該槽表也會加⼊到延遲隊列中。
- ⼀個線程會將超時的定時任務列表會從延遲隊列的poll⽅法彈出。定時任務列表超時並不⼀定代表定時任務超時,將定時任務重新加⼊時間輪,如果加⼊失敗,說明定時任務確實超時,提交給線程池執⾏。
- 延遲隊列的poll⽅法只會彈出超時的定時任務列表,隊列中的每個元素(定時任務列表)按照超時時間排序,如果第⼀個定時任務列表都沒有過期,那麼其他定時任務列表也⼀定不會超時。
延遲操作本身的失效時間是客戶端請求設置的,延遲隊列的元素(每個定時任務列表)也有失效時間,當定時任務列表中的getDelay()⽅法返回值⼩於等於0,就表示定時任務列表已經過期,需要⽴即執⾏。
如果當前的時間輪放不下加⼊的時間時,就會創建⼀個更⾼層的時間輪。定時器只持有第⼀層的時間輪的引⽤,並不會持有更⾼層的時間輪。因為第⼀層的時間輪會持有第⼆層的時間輪的引⽤,第⼆層會持有第三層的時間輪的引⽤。定時器將定時任務加⼊到當前時間輪,要判斷定時任務的失效時間⾸是否在當前時間輪的範圍內,如果不在當前時間輪的範圍內,則要將定時任務上升到更⾼⼀層的時間輪中。時間輪包含了定時器全局的延遲隊列。
時間輪中的變量:tickMs=1:表示⼀格的⻓度是1毫秒;wheelSize=20表示⼀共20格,時間輪的範圍就是20毫秒,定時任務的失效時間⼩於等於20毫秒的都會加⼊到這⼀層的時間輪中;interval=tickMs*wheelSize=20,如果需要創建更⾼⼀層的時間輪,那麼低⼀層的時間輪的interval的值作為⾼⼀層數據輪的tickMs值;currentTime當前時間輪的當前時間,往前移動時間輪,主要就是更新當前時間輪的當前時間,更新後重新加⼊定時任務條⽬。
9. 一道面試題
⾯試題⼤致上是這樣的:消費者去Kafka⾥拉去消息,但是⽬前Kafka中⼜沒有新的消息可以提供,那麼Kafka會如何處理?
如下圖所示,兩個follower副本都已經拉取到了leader副本的最新位置,此時⼜向leader副本發送拉取請求,⽽leader副本並沒有新的消息寫⼊,那麼此時leader副本該如何處理呢?可以直接返回空的拉取結果給follower副本,不過在leader副本⼀直沒有新消息寫⼊的情況下,follower副本會⼀直發送拉取請求,並且總收到空的拉取結果,這樣徒耗資源,顯然不太合理。
這⾥就涉及到了Kafka延遲操作的概念。Kafka在處理拉取請求時,會先讀取⼀次⽇志⽂件,如果收集不到⾜夠多(fetchMinBytes
,由參數fetch.min.bytes
配置,默認值為1)的消息,那麼就會創建⼀個延時拉取操作(DelayedFetch
)以等待拉取到⾜夠數量的消息。當延時拉取操作執⾏時,會再讀取⼀次⽇志⽂件,然後將拉取結果返回給follower副本。
延遲操作不只是拉取消息時的特有操作,在Kafka中有多種延時操作,⽐如延時數據刪除、延時⽣產等。
對於延時⽣產(消息)⽽⾔,如果在使⽤⽣產者客戶端發送消息的時候將acks參數設置為-1,那麼就意味着需要等待ISR集合中的所有副本都確認收到消息之後才能正確地收到響應的結果,或者捕獲超時異常。
假設某個分區有3個副本:leader、follower1和follower2,它們都在分區的ISR集合中。為了簡化說明,這⾥我們不考慮ISR集合伸縮的情況。Kafka在收到客戶端的⽣產請求後,將消息3和消息4寫⼊leader副本的本地⽇志⽂件,如上圖所示。
由於客戶端設置了acks為-1,那麼需要等到follower1和follower2兩個副本都收到消息3和消息4後才能告知客戶端正確地接收了所發送的消息。如果在⼀定的時間內,follower1副本或follower2副本沒能夠完全拉取到消息3和消息4,那麼就需要返回超時異常給客戶端。⽣產請求的超時時間由參數request.timeout.ms配置,默認值為30000,即30s。
那麼這⾥等待消息3和消息4寫⼊follower1副本和follower2副本,並返回相應的響應結果給客戶端的動作是由誰來執⾏的呢?在將消息寫⼊leader副本的本地⽇志⽂件之後,Kafka會創建⼀個延時的⽣產操作(DelayedProduce),⽤來處理消息正常寫⼊所有副本或超時的情況,以返回相應的響應結果給客戶端。
延時操作需要延時返迴響應的結果,⾸先它必須有⼀個超時時間(delayMs),如果在這個超時時間內沒有完成既定的任務,那麼就需要強制完成以返迴響應結果給客戶端。其次,延時操作不同於定時操作,定時操作是指在特定時間之後執⾏的操作,⽽延時操作可以在所設定的超時時間之前完成,所以延時操作能夠⽀持外部事件的觸發。
就延時⽣產操作⽽⾔,它的外部事件是所要寫⼊消息的某個分區的HW(⾼⽔位)發⽣增⻓。也就是說,隨着follower副本不斷地與leader副本進⾏消息同步,進⽽促使HW進⼀步增⻓,HW每增⻓⼀次都會檢測是否能夠完成此次延時⽣產操作,如果可以就執⾏以此返迴響應結果給客戶端;如果在超時時間內始終⽆法完成,則強制執⾏。
回顧⼀下⽂中開頭的延時拉取操作,它也同樣如此,也是由超時觸發或外部事件觸發⽽被執⾏的。超時觸發很好理解,就是等到超時時間之後觸發第⼆次讀取⽇志⽂件的操作。外部事件觸發就稍複雜了⼀些,因為拉取請求不單單由follower副本發起,也可以由消費者客戶端發起,兩種情況所對應的外部事件也是不同的。如果是follower副本的延時拉取,它的外部事件就是消息追加到了leader副本的本地⽇志⽂件中;如果是消費者客戶端的延時拉取,它的外部事件可以簡單地理解為HW的增⻓。
二、重試隊列
kafka沒有重試機制不⽀持消息重試,也沒有死信隊列,因此使⽤kafka做消息隊列時,需要⾃⼰實現消息重試的功能。
自己實現(創建新的kafka主題作為重試隊列):
- 創建⼀個topic作為重試topic,⽤於接收等待重試的消息。
- 普通topic消費者設置待重試消息的下⼀個重試topic。
- 從重試topic獲取待重試消息儲存到redis的zset中,並以下⼀次消費時間排序
- 定時任務從redis獲取到達消費事件的消息,並把消息發送到對應的topic
- 同⼀個消息重試次數過多則不再重試