使用多線程增加kafka消費能力
- 2019 年 10 月 6 日
- 筆記
前提:本例適合那些沒有順序要求的消息主題。
kafka通過一系列優化,寫入和讀取速度能夠達到數萬條/秒。通過增加分區數量,能夠通過部署多個消費者增加並行消費能力。但還是有很多情況下,某些業務的執行速度實在是太慢,這個時候我們就要用到多線程去消費,提高應用機器的利用率,而不是一味的給kafka增加壓力。

使用Spring創建一個kafka消費者是非常簡單的。我們選擇的方式是繼承kafka的ShutdownableThread
,然後實現它的doWork
方法即可。
參考:
https://github.com/apache/kafka/blob/2.1/examples/src/main/java/kafka/examples/Consumer.java
多線程消費某個分區的數據
即然是使用多線程,我們就需要新建一個線程池。

我們創建了一個最大容量為20的線程池,其中有兩個參數需要注意一下。(參考《JAVA多線程使用場景和注意事項簡版》)。
我們使用了了零容量的SynchronousQueue
,一進一出,避免隊列里緩衝數據,這樣在系統異常關閉時,就能排除因為阻塞隊列丟消息的可能。 然後使用了CallerRunsPolicy
飽和策略,使得多線程處理不過來的時候,能夠阻塞在kafka的消費線程上。
然後,我們將真正處理業務的邏輯放在任務中多線程執行,每次執行完畢,我們都手工的commit一次ack
,表明這條消息我已經處理了。由於是線程池認領了這些任務,順序性是無法保證的,可能有些任務沒有執行完畢,後面的任務就已經把它的offset給提交了。o.O
不過這暫時不重要,首先讓它並行化運行就好。

可惜的是,當我們運行程序,直接拋出了異常,無法進行下去。

程序直接說了:
KafkaConsumer is not safe for multi-threaded access
顯然,kafka的消費端不是線程安全的,它拒絕你這麼調用它的api。kafka的初衷是好的,想要避免一些並發環境的問題,但我確實需要使用多線程處理。
kafka消費者通過比較調用者的線程id來判斷是否是由外部線程發起請求。
private void acquire() { long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet(); }
得,只能將commitSync
函數放在線程外面了,先提交ack、再執行任務。
加入管道
我們獲取的消息,可能在真正被執行之前,會進行一些過濾,比如一些空值或者特定條件的判斷。雖然可以直接放在消費者線程里運行,但顯的特別的亂,可以加入一個生產者消費者模型(你可以認為這是畫蛇添足)。這裡採用的是阻塞隊列依然是SynchronousQueue
,它充當了管道的功能。

我們把任務放入管道後,立馬commit。如果線程池已經滿了,將一直阻塞在消費者線程里,直到有空缺。然後,我們單獨啟動了一個線程,用來接收這些數據,然後提交到這部分的代碼看起來大概這樣。

應用能夠啟動了,消費速度賊快。
參數配置
kafka的參數非常的多,我們比較關心的有以下幾個參數。
max.poll.records
調用一次poll,返回的最大條數。這個值設置的大,那麼處理的就慢,很容易超出max.poll.interval.ms
的值(默認5分鐘),造成消費者的離線。在耗時非常大的消費中,是需要特別注意的。
enable.auto.commit
是否開啟自動提交(offset)如果開啟,consumer已經消費的offset信息將會間歇性的提交到kafka中(持久保存)
當開啟offset自動提交時,提交請求的時間頻率由參數auto.commit.interval.ms
控制。
fetch.max.wait.ms
如果broker端反饋的數據量不足時(fetch.min.bytes),fetch請求等待的最長時間。如果數據量滿足需要,則立即返回。
session.timeout.ms
consumer會話超時時長,如果在此時間內,server尚未接收到consumer任何請求(包括心跳檢測),那麼server將會判定此consumer離線。此值越大,server等待consumer失效、rebalance時間就越長。
heartbeat.interval.ms
consumer協調器與kafka集群之間,心跳檢測的時間間隔。kafka集群通過心跳判斷consumer會話的活性,以判斷consumer是否在線,如果離線則會把此consumer註冊的partition分配(assign)給相同group的其他consumer。此值必須小於「session.timeout.ms」,即會話過期時間應該比心跳檢測間隔要大,通常為session.timeout.ms的三分之一,否則心跳檢測就失去意義。
在本例中,我們的參數簡單的設置如下,主要調整了每次獲取的條數和檢測時間。其他的都是默認。

消息保證
仔細的同學可能會看到,我們的代碼依然不是完全安全的。這是由於我們提前提交了ack導致的。程序正常運行下,這無傷大雅。但在應用異常關閉的時候,那些正在執行中的消息,很可能會丟失,對於一致性要求非常高的應用,我們要從兩個手段上進行保證。
使用關閉鉤子
第一種就是考慮kill -15的情況。這種方式比較簡單,只要覆蓋ShutdownableThread的shutdown方法即可,應用將有機會執行線程池中的任務,確保消費完畢再關閉應用。
@Override public void shutdown() { super.shutdown(); executor.shutdown(); }
使用日誌處理
應用oom,或者直接kill -9了,事情就變得麻煩起來。
維護一個單獨的日誌文件(或者本地db),在commit之前寫入一條日誌,然後在真正執行完畢之後寫入一條對應的日誌。當系統啟動時,讀取這些日誌文件,獲取沒有執行成功的任務,重新執行。
想要效率,還想要可靠,是得下點苦力氣的。
藉助redis處理
這種方式與日誌方式類似,但由於redis的效率很高(可達數萬),而且方便,是優於日誌方式的。
可以使用Hash結構,提交任務的同時寫入Redis,任務執行完畢刪掉這個值,那麼剩下的就是出現問題的消息。

在系統啟動時,首先檢測一下redis中是否有異常數據。如果有,首先處理這些數據,然後正常消費。
End
多線程是為了增加效率,redis等是為了增加可靠性。業務代碼是非常好編寫的,搞懂了邏輯就搞定了大部分;業務代碼有時候又是困難的,你要編寫大量輔助功能增加它的效率、照顧它的邊界。
以程序員的角度來說,最有競爭力的代碼都是為了照顧小概率發生的邊界異常。
kafka在吞吐量和可靠性方面,有各種的權衡,很多都是魚和熊掌的關係。不必糾結於它本身,我們可以藉助外部的工具,獲取更大的收益。在這種情況下,redis當機與應用同時當機的概率還是比較小的。5個9的消息保證是可以做到的,剩下的那點不完美問題消息,你為什麼不從日誌里找呢?