Kafka分佈式查詢引擎

1.概述

Kafka是一個分佈式消息中間件系統,裏面存儲着實際場景中的數據。Kafka原生是不支持點查詢的,如果我們想對存儲在Topic中的數據進行查詢,可能需要對Topic中的數據進行消費落地,然後構建索引(或者數據落地到自帶所以的存儲系統中,例如HBase、Hive等)。今天,筆者就為大家來介紹如何實現Kafka分佈式查詢引擎。

2.內容

對於點查詢,我們可以總結為兩個要點。其一,有數據供我們查詢;其二,對待查詢的數據構建索引。在Kafka中,Topic存儲數據,滿足了第一點,雖然Kafka有索引的概念,但是它的索引是基於Offset的稀疏索引,並不是對每條Message都會構建一個索引。並且,這個Offset索引對於實際情況查詢場景來說,也幫助不大。比如,你查詢Topic01下的Partition_0,但是,也僅僅只是查下到某個Topic中分區下的Offset對應的一條記錄,但是這條記錄是啥,你並不知道。真實查詢的情況,可能是你需要查詢某個ID,或者模糊查詢某個Name是否存在。

2.1 索引

其實有一種方式,是可行的。就是對Kafka源代碼進行改造,在Broker落地每條數據的時候,構建一條索引(其實,這種方式與在原始的Kafka外面加一層Proxy類似,由Proxy充當與Client交互的角色,接收Client的數據存儲並構建索引)。這樣的實現方式如下圖:

 

如果對Kafka源代碼熟悉,有能力改造其源代碼,可以在Kafka中添加對每條數據構建索引的邏輯。如果,覺得怕對Kafka的性能有影響,或者改造有難度。上述流程圖的方式,也可以實現這種點查詢。

改造Kafka源代碼添加索引,或者是Proxy的方式存儲數據並構建索引,這種兩種方式來說,數據上都會要冗餘一倍左右的的存儲容量。

2.2 單節點查詢

基於上述的問題,我們對這種方案進行升級改造一下。因為很多情況下,生產環境的數據已經是運行了很長時間了,加Proxy或者改造Kafka源代碼的方式適合構建一個Kafka的新集群的時候使用。對於已有的Kafka集群,如果我們要查詢Topic中的數據,如何實現呢。

Kafka-Eagle中,我對Topic數據查詢實現了基於SQL查詢的實現方案。邏輯是這樣的,編寫SQL查詢語句,對SQL進行解析,映射出一個Topic的Schema以及過濾條件,然後根據過濾條件消費Topic對應的數據,最後拿到數據集,通過SQL呈現出最後的結果。流程圖如下:

 

但是,這樣是由局限性的。由於,單節點的計算能力有限,所以對每個Partition默認查詢5000條數據,這個記錄是可以增加或者減少的。如果在配置文件中對這個屬性增大,比如設置為了50000條,那麼對應的ke.sh腳本中的內存也需要增加,因為每次查詢需要的內存增加了。不然,頻繁若干用戶同時查詢,容易造成OOM的情況。

但是,通常一個Topic中存儲的數據一般達到上億條數據以上,這種方式要從上億條或者更多的數據中查詢我們想要的數據,可能就滿足不了了。

2.3 分佈式查詢

基於這種情況,我們可以對這中單節點查詢的方式進行升級改造,將它變為分佈式查詢。其實,仔細來看,單節點查詢的方式,就是一個分佈式查詢的縮版。那我們需要實現這樣一個分佈式查詢的Kafka SQL引擎呢?

首先,我們可以藉助Hadoop的MapReduce思想,「化繁為簡,分而治之」。我們將一個Topic看成一個比較大的數據集,每次我們需要對這個數據集進行查詢,可以將待查詢的數據進行拆分若干份Segment,然後,充分利用服務器的CPU,進行多線程消費(這樣就可以打破Kafka中一個線程只能消費一個分區的局限性)。實現流程圖如下:

 

上圖可知,由客戶端發起請求,提交請求到Master節點,然後Master節點解析客戶端的請求,並生成待執行策略。比如上述有三個工作節點,按照客戶端的情況,Master會將生成的執行策略下發給三個工作節點,讓其進行計算。

這裡以其中一個工作節點為例子,比如WorkNode1接收到了Master下發的計算任務,接收到執行指令後,結合工作節點自身的資源情況(比如CPU和內存,這裡CPU較為重要),將任務進行拆解為若干個子任務(子任務的個數取決於每個批次的BatchSize,可以在屬性中進行配置),然後讓生成好的若干個子任務並行計算,得到若干個子結果,然後將若干個子結果匯總為一個最終結果作為當前工作節點的最終計算結果,最後將不同的工作節點的結果進行最後的Merge作為本次查詢的結果返回給Master節點(這裡需要注意的是,多個工作節點匯總在同一個JobID下)。然後,Master節點收到工作節點返回的結果後,返回給客戶端。

3.結果預覽

查詢10條Topic中的數據,工作節點執行如下:

select * from ke1115 where `partition` in (0) limit 10

 

 上圖顯示了,同一WorkNode節點下,同一JobID中,不同線程子任務的計算進度日誌。

 

KSqlStrategy顯示了Master節點下發的待執行策略,msg表示各個工作節點返回的最終結果。

4.待優化

目前Kafka分佈式查詢引擎基礎功能已實現可以用,任務託管、子任務查詢內存優化等還有優化的空間,計劃正在考慮集成到KafkaEagle系統中。 

5.結束語

這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。

Tags: