flink:StreamGraph轉換為JobGraph

1 轉換基本流程

2 簡單來看可以分為兩部分:
第一部分是通過一些util、translator、generator等類將職責進行解耦、託管和分離,期間涉及FlinkPipelineTranslationUtil、FlinkPipelineTranslator/StreamGraphTranslator、StreamingJobGraphGenerator等。
第二部分最終轉換的操作落在StreamingJobGraphGenerator中,涉及StreamGraph、StreamEdge、StreamConfig、JobGraph、JobVertex等,下面主要關注點在第二步:
3 StreamingJobGraphGenerator的構造方法和成員變數
唯一構造方法:

將StreamGraph對象作為參數傳遞進來,並初始化一個JobGraph空殼和一系列的成員變數(主要是map,需要保持各種對應關係),用於存儲轉換的中間態

從命名不難看出各個map的作用,核心套路大多是用節點id或者節點的hash值映射節點
4 StreamingJobGraphGenerator.createJobGraph方法
主要要弄清楚StreamNode轉化成JobVertex、運算元合併、邊上下游關係轉換的核心邏輯

4.1 StreamingJobGraphGenerator.createChain方法
這裡主要是把SteamNode轉化為JobVertex,並根據按需合併運算元
步驟:
a、在調用時遍歷節點,並通過builtVertices保存已經處理過的節點
b,判斷outEdge能不能chain,分門別類放到不同的List集合中待處理
c、對於能chain的節點,就把自己銜接到前一個上面去,把銜接的路徑存儲下來,然後再把銜接的前一個和自己的後一個再遞歸調用拿去計算
d、對於不能chain的節點,就作為一個頭節點來單獨處理掉
e、然後維護單個/合併後的關係,包括合併後的命名、資源、格式化方式等
f、處理轉換邏輯,如果是頭就創建個JobVertex返回StreamConfig,如果不是就創建個StreamConfig

4.2 StreamingJobGraphGenerator.isChainable方法
決定StreamEdge兩邊能否chian的邏輯:

4.3 StreamingJobGraphGenerator.createChainedName方法
這個是處理合併後的命名,在日誌中或者生成的圖中可以看到

4.4 StreamingJobGraphGenerator.createJobVertex方法
這裡是StreamNode轉變為JobVertex的真正實現,其實也很簡單,第一步根據節點的輸出new出不同類型的JobVertex,第二步把StreamNode的執行參數複製過來,第三步把自己和相關的映射關係填充到jobGraph和相應的map中去

4.5 StreamingJobGraphGenerator.connect方法

5 總的來看由於在StreamGraph中已經構建好了DAG的關係和映射,此過程中最核心的邏輯就是在createChain合併運算元的過程。
6、下面是JobGraph、JobVertex和JobEdge的主要屬性,可以對比StreamGraph、StreamNode和StreamEdge來理解


Tags: