老王,聽說你搞了個牛叉的數據接入系統

小李:老王,早上又吃雞蛋灌餅不加腸啊!
老王:是啊,我的腸又被捐出去了!小李,看你臉色,這又是昨天加班了。
小李:可別提了,昨天logstash機器又頂不住了,還被運維給鄙視了一番,看來我得學習學習你那牛叉的數據接入系統了,上線以來都沒出過問題。
老王:好啊,我給你講一講,巴拉巴拉….
周圍的人:這人不是有什麼大病吧….
老王:我還是抓緊吃完去公司給你講吧
小李:好吧!圖片

    一提到把kafka數據落地到hdfs,大家最先想到的一定是logstash,由於logstash很笨重,當數據量很大時,我們經常會遇到資源不夠用的問題,也沒辦法精確控制hdfs生成文件的大小。為了不影響後續hive分析的性能,我們經常還需要把一些小文件進行合併(小文件是hdfs中不得不面對的問題),這不僅拉長了鏈路,也會造成不必要的資源浪費。
圖片

    為了避免資源浪費,降低數據接入鏈路,我們實現了一個數據接入系統。主要優點如下所示。

    1.通過配置文件可以快速實現數據接入需求(針對kafka->hdfs的場景)。

    2.實現精確消費一次(Exactly-once),即保證數據不丟不重複。

    3.可以手動配置生成文件的大小。

    整體架構如下圖所示。  

圖片

   我們使用目前很火的flink消費kafka(關於flink和spark的優缺點大家自行百度),然後定時的更新offset到hbase中,以滿足精確消費一次的場景。我們的flink任務也很簡單,主要有兩個算子組成kafkasource和parquetsink,如圖所示。

圖片

1.kafkasource

    這裡的kafkasource主要是用來消費kafka的數據,不過這裡有一點需要注意,我們在消費kafka的時候,需要從hbase讀取相應的offset信息,如果沒讀到offset信息,表明這個任務是第一次啟動,我們從該消費組的位置讀。如果讀到了offset信息,為了避免重複消費,我們需要從該offset位置開始讀(因為該offset之前的數據已經成功落地到hdfs上了)。  這裡為了實現這個功能,我們需要重新實現flink自帶的FlinkKafkaConsumerBase類,我們需要在內部添加從hbase讀取offset的邏輯。

long offset = offsetManager.getPartitionOffset(seedPartition.getTopic(), seedPartition.getPartition());
if (offset != -1) {
    subscribedPartitionsToStartOffsets.put(seedPartition, offset);
} else {
    subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}

image.png

    這樣我們就可以實現在flink checkpoint失敗時,從hbase中讀取offset信息,來控制kafka從哪個位置開始消費。這裡還有一點需要注意,我們在任務啟動時,也需要添加這塊代碼邏輯。你可以自己思考一下。

2.parquetsink

   parquetsink這裡主要是實現往hdfs寫入數據,我們從parquet源碼里可以把寫數據成parquet文件的邏輯copy出來,自己實現往hdfs寫parquet文件。這裡就不重點強調了,這裡主要看一下和hbase的交互流程。

   我們這裡主要是重寫CheckpointedFunction里的snapshotState方法,該方法每次在checkpoint的時候,都會被調用。所以我們在該方法里實現文件的生成和更新hbase中offset的工作。這裡首先會判斷生成文件的大小是否已經滿足我們設置的大小,如果沒有滿足,我們就不做處理。如果滿足我們設置的文件大小,我們會把這個臨時文件上線,然後更新hbase中offset的信息。關鍵代碼邏輯如下圖所示。

val isSuccess = commitPendingToStable(writerState.getParentPath, writerState.getFileName)
if (isSuccess) {
    offsetManager.saveOffset(partitionInfo.getTopic, partitionInfo.getPartition, writerState.getEndOffset + 1)
}

     到目前為止,我們已經把關鍵點都聊完了。我們來總結一下。看一下整體的執行流程。

    在任務執行中,flink定時執行checkpoint,假設為10s,然後就會調用 snapshotState方法去檢測文件大小是否滿足我們配置的大小,如果不滿足,不做處理。如果滿足,我們把文件上線,然後提交offset到hbase。這就代表着hbase中存儲的offset表示我們已經成功落地的數據。如果checkpoint失敗或者任務掛掉,由於我們重啟或者checkpoint失敗恢復任務時,我們是從hbase中讀取offset信息,因此可以保證精確一次消費,保證落地的數據不丟失不重複。

   今天我們就聊到這裡,更多有趣知識,歡迎關注公眾號[程序員學長]。如果對本文有什麼疑問點,歡迎留言討論。