Flink 反壓 淺入淺出
- 2020 年 12 月 21 日
- 筆記
前言
微信搜【Java3y】關注這個樸實無華的男人,點贊關注是對我最大的支持!
文本已收錄至我的GitHub://github.com/ZhongFuCheng3y/3y,有300多篇原創文章,最近在連載面試和項目系列!
最近一直在遷移Flink
相關的工程,期間也踩了些坑,checkpoint
和反壓
是其中的一個。

敖丙太菜了,Flink
都不會,只能我自己來了。看敖丙只能圖一樂,學技術還是得看三歪
平時敖丙黑我都沒啥水平,拿點簡單的東西來就說我不會。我是敖丙的頭號黑粉
今天來分享一下 Flink
的checkpoint
機制和背壓
原理,我相信通過這篇文章,大家在玩Flink
的時候可以更加深刻地了解Checkpoint
是怎麼實現的,並且在設置相關參數以及使用的時候可以更加地得心應手。
上一篇已經寫過Flink
的入門教程了,如果還不了解Flink
的同學可以先去看看:《Flink入門教程》
前排提醒,本文基於Flink 1.7
《淺入淺出學習Flink的背壓知識》
開胃菜
在講解Flink
的checkPoint
和背壓
機制之前,我們先來看下checkpoint
和背壓
的相關基礎,有助於後面的理解。
作為用戶,我們寫好Flink
的程序,上管理平台提交,Flink
就跑起來了(只要程序代碼沒有問題),細節對用戶都是屏蔽的。

實際上大致的流程是這樣的:
-
Flink
會根據我們所寫代碼,會生成一個StreamGraph
的圖出來,來代表我們所寫程序的拓撲結構。 -
然後在提交的之前會將 StreamGraph
這個圖優化一把(可以合併的任務進行合併),變成JobGraph
-
將 JobGraph
提交給JobManager
-
JobManager
收到之後JobGraph
之後會根據JobGraph
生成ExecutionGraph
(ExecutionGraph
是JobGraph
的並行化版本) -
TaskManager
接收到任務之後會將ExecutionGraph
生成為真正的物理執行圖

可以看到物理執行圖
真正運行在TaskManager
上Transform
和Sink
之間都會有ResultPartition
和InputGate
這倆個組件,ResultPartition
用來發送數據,而InputGate
用來接收數據。

屏蔽掉這些Graph
,可以發現Flink
的架構是:Client
->JobManager
->TaskManager

從名字就可以看出,JobManager
是干「管理」,而TaskManager
是真正幹活的。回到我們今天的主題,checkpoint
就是由JobManager
發出。

而Flink
本身就是有狀態的,Flink
可以讓你選擇執行過程中的數據保存在哪裡,目前有三個地方,在Flink
的角度稱作State Backends
:
-
MemoryStateBackend(內存) -
FsStateBackend(文件系統,一般是HSFS) -
RocksDBStateBackend(RocksDB數據庫)
同樣的,checkpoint
信息也是保存在State Backends
上

耗子屎
最近在Storm
遷移Flink
的時候遇到個問題,我來簡單描述一下背景。
我們從各個數據源從清洗出數據,藉助Flink
清洗,組裝成一個寬模型,最後交由kylin
做近實時數據統計和展示,供運營實時查看。

遷移的過程中,發現訂單的topic
消費延遲了好久,初步懷疑是因為訂單上游的並發度
不夠所影響的,所以調整了兩端的並行度重新發佈一把。
發佈的過程中,系統起來以後,再去看topic
消費延遲的監控,就懵逼了。什麼?怎麼這麼久了啊?絲毫沒有降下去的意思。
這時候只能找組內的大神去尋求幫忙了,他排查一番後表示:這checkpoint
一直沒做上,都堵住了,重新發佈的時候只會在上一次checkpoint
開始,由於checkpoint
長時間沒完成掉,所以重新發佈數據量會很大。這沒啥好辦法了,只能在這個堵住的環節下扔掉吧,估計是業務邏輯出了問題。
畫外音:接收到訂單的數據,會去溯源點擊,判斷該訂單從哪個業務來,經過了哪些的業務,最終是哪塊業務致使該訂單成交。

畫外音:外部真正使用時,依賴「訂單結果HBase」數據

我們認為點擊的數據有可能會比訂單的數據處理要慢一會,所以找不到的數據會間隔一段時間輪詢,又因為Flink
提供State
「狀態」 和checkpoint
機制,我們把找不到的數據放入ListState
按一定的時間輪詢就好了(即便系統由於重啟或其他原因掛了,也不會把數據丟了)。
理論上只要沒問題,這套方案是可行的。但現在結果告訴我們:訂單數據報來了以後,一小批量數據一直在「訂單結果HBase」沒找到數據,就放置到ListState
上,然後來一條數據就去遍歷ListState
。導致的後果就是:
-
數據消費不過來,形成反壓 -
checkpoint
一直沒成功
當時處理的方式就是把ListState清空掉,暫時丟掉這一部分的數據,讓數據追上進度。
後來排查後發現是上游在消息報字段上做了「手腳」,解析失敗導致點擊丟失,造成這一連鎖的後果。
排查問題的關鍵是理解Flink
的反壓
和checkpoint
的原理是什麼樣的,下面我來講述一下。
反壓
反壓backpressure
是流式計算中很常見的問題。它意味着數據管道中某個節點成為瓶頸,處理速率跟不上「上游」發送數據的速率,上游需要進行限速

上面的圖代表了是反壓極簡的狀態,說白了就是:下游處理不過來了,上游得慢點,要堵了!
最令人好奇的是:「下游是怎麼通知上游要發慢點的呢?」
在前面Flink
的基礎知識講解,我們可以看到ResultPartition
用來發送數據,InputGate
用來接收數據。

而Flink
在一個TaskManager
內部讀寫數據的時候,會有一個BufferPool
(緩衝池)供該TaskManager
讀寫使用(一個TaskManager
共用一個BufferPool
),每個讀寫ResultPartition/InputGate
都會去申請自己的LocalBuffer

以上圖為例,假設下游處理不過來,那InputGate
的LocalBuffer
是不是被填滿了?填滿了以後,ResultPartition
是不是沒辦法往InputGate
發了?而ResultPartition
沒法發的話,它自己本身的LocalBuffer
也遲早被填滿,那是不是依照這個邏輯,一直到Source
就不會拉數據了…

這個過程就猶如InputGate/ResultPartition
都開了自己的有界阻塞隊列,反正「我」就只能處理這麼多,往我這裡發,我滿了就堵住唄,形成連鎖反應一直堵到源頭上…
上面是只有一個TaskManager
的情況下的反壓,那多個TaskManager
呢?(畢竟我們很多時候都是有多個TaskManager
在為我們工作的)
我們再看回Flink
通信的總體數據流向架構圖:

從圖上可以清洗地發現:遠程通信用的Netty
,底層是TCP Socket
來實現的。
所以,從宏觀的角度看,多個TaskManager
只不過多了兩個Buffer
(緩衝區)。
按照上面的思路,只要InputGate
的LocalBuffer
被打滿,Netty Buffer
也遲早被打滿,而Socket Buffer
同樣遲早也會被打滿(TCP 本身就帶有流量控制),再反饋到ResultPartition
上,數據又又又發不出去了…導致整條數據鏈路都存在反壓的現象。
現在問題又來了,一個TaskManager
的task
可是有很多的,它們都共用一個TCP Buffer/Buffer Pool
,那隻要其中一個task
的鏈路存在問題,那不導致整個TaskManager
跟着遭殃?

在Flink 1.5版本
之前,確實會有這個問題。而在Flink 1.5版本
之後則引入了credit
機制。
從上面我們看到的Flink
所實現的反壓,宏觀上就是直接依賴各個Buffer
是否滿了,如果滿了則無法寫入/讀取導致連鎖反應,直至Source
端。
而credit
機制,實際上可以簡單理解為以「更細粒度」去做流量控制:每次InputGate
會告訴ResultPartition
自己還有多少的空閑量可以接收,讓ResultPartition
看着發。如果InputGate
告訴ResultPartition
已經沒有空閑量了,那ResultPartition
就不發了。

那實際上是怎麼實現的呢?擼源碼!
在擼源碼之前,我們再來看看下面物理執行圖:實際上InPutGate
下是InputChannel
,ResultPartition
下是ResultSubpartition
(這些在源碼中都有體現)。

InputGate(接收端處理反壓)
我們先從接收端看起吧。Flink
接收數據的方法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
隨後定位到處理反壓的邏輯:
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
進去getNextNonBlocked()
方法看(選擇的是BarrierBuffer
實現):

我們就直接看null
的情況,看下從初始化階段開始是怎麼搞的,進去getNextBufferOrEvent()
進去方法裏面看到兩個比較重要的調用:

requestPartitions();
result = currentChannel.getNextBuffer();
先從requestPartitions()
看起吧,發現裡邊套了一層(從InputChannel
下獲取到subPartition
):

於是再進requestSubpartition()
(看RemoteInputChannel
的實現吧)
在這裡看起來就是創建Client
端,然後接收上游發送過來的數據:

先看看client
端的創建姿勢吧,進createPartitionRequestClient()
方法看看(我們看Netty
的實現)。
點了兩層,我們會進到createPartitionRequestClient()
方法,看源碼注釋就可以清晰發現,這會創建TCP
連接並且創建出Client
供我們使用

我們還是看null
的情況,於是定位到這裡:

進去connect()
方法看看:

我們就看看具體生成邏輯的實現吧,所以進到getClientChannelHandlers
上
意外發現源碼還有個通信簡要流程圖給我們看(哈哈哈):

好了,來看看getClientChannelHandlers
方法吧,這個方法不長,主要判斷了下要生成的client
是否開啟creditBased
機制:
public ChannelHandler[] getClientChannelHandlers() {
NetworkClientHandler networkClientHandler =
creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
new PartitionRequestClientHandler();
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
networkClientHandler};
}
於是我們的networkClientHandler
實例是CreditBasedPartitionRequestClientHandler
到這裡,我們暫且就認為Client
端已經生成完了,再退回去getNextBufferOrEvent()
這個方法,requestPartitions()
方法是生成接收數據的Client
端,具體的實例是CreditBasedPartitionRequestClientHandler

下面我們進getNextBuffer()
看看接收數據具體是怎麼處理的:

拿到數據後,就會開始執行我們用戶的代碼了調用process
方法了(這裡我們先不看了)。還是回到反壓的邏輯上,我們好像還沒看到反壓的邏輯在哪裡。重點就是receivedBuffers
這裡,是誰塞進去的呢?
於是我們回看到Client
具體的實例CreditBasedPartitionRequestClientHandler
,打開方法列表一看,感覺就是ChannelRead()
沒錯了:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
跟着decodeMsg
繼續往下走吧:

繼續下到decodeBufferOrEvent()

繼續下到onBuffer
:

所以我們往onSenderBacklog
上看看:

最後調用notifyCreditAvailable
將Credit
往上游發送:
public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
}

最後再畫張圖來理解一把(關鍵鏈路):

ResultPartition(發送端處理反壓)
發送端我們從org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager
開始看起

於是我們進去看fromConfiguration()

進去start()
去看,隨後進入connectionManager.start()
(還是看Netty
的實例):

進去看service.init()
方法做了什麼(又看到熟悉的身影):

好了,我們再進去getServerChannelHandlers()
看看吧:

有了上面經驗的我們,直接進去看看它的方法,沒錯,又是channnelRead
,只是這次是channelRead0
。

ok,我們進去addCredit()
看看:

reader.addCredit(credit)
只是更新了下數量
public void addCredit(int creditDeltas) {
numCreditsAvailable += creditDeltas;
}
重點我們看下enqueueAvailableReader()
方法,而enqueueAvailableReader()
的重點就是判斷Credit
是否足夠發送

isAvailable
的實現也很簡單,就是判斷Credit
是否大於0且有真實數據可發

而writeAndFlushNextMessageIfPossible
實際上就是往下游發送數據:

拿數據的時候會判斷Credit
是否足夠,不足夠拋異常:

再畫張圖來簡單理解一下:

背壓總結
「下游」的處理速度跟不上「上游」的發送速度,從而降低了處理速度,看似是很美好的(畢竟看起來就是幫助我們限流了)。
但在Flink
里,背壓再加上Checkponit
機制,很有可能導致State
狀態一直變大,拖慢完成checkpoint
速度甚至超時失敗。
當checkpoint
處理速度延遲時,會加劇背壓的情況(很可能大多數時間都在處理checkpoint
了)。
當checkpoint
做不上時,意味着重啟Flink
應用就會從上一次完成checkpoint
重新執行(…
舉個我真實遇到的例子:
我有一個
Flink
任務,我只給了它一台TaskManager
去執行任務,在更新DB的時候發現會有並發的問題。只有一台
TaskManager
定位問題很簡單,稍微定位了下判斷:我更新DB的Sink 並行度調高了。如果Sink的並行度設置為1,那肯定沒有並發的問題,但這樣處理起來太慢了。
於是我就在Sink之前根據
userId
進行keyBy
(相同的userId都由同一個Thread處理,那這樣就沒並發的問題了)

看似很美好,但userId
存在熱點數據的問題,導致下游數據處理形成反壓
。原本一次checkpoint
執行只需要30~40ms
,反壓
後一次checkpoint
需要2min+
。
checkpoint
執行間隔相對頻繁(6s/次
),執行時間2min+
,最終導致數據一直處理不過來,整條鏈路的消費速度從原來的3000qps
到背壓後的300qps
,一直堵住(程序沒問題,就是處理速度大大下降,影響到數據的最終產出)。
最後
本來想着這篇文章把反壓和Checkpoint
都一起寫了,但寫着寫着發現有點長了,那checkpoint
開下一篇吧。
相信我,只要你用到Flink
,遲早會遇到這種問題的,現在可能有的同學還沒看懂,沒關係,先點個贊👍,收藏起來,後面就用得上了。
參考資料:
三歪把【大廠面試知識點】、【簡歷模板】、【原創文章】全部整理成電子書,共有1263頁!點擊下方鏈接直接取就好了
PDF文檔的內容均為手打,有任何的不懂都可以直接來問我
