Flink實例(五十): Operators(十)多流轉換運算元(五)coGroup 與union

參考鏈接://mp.weixin.qq.com/s/BOCFavYgvNPSXSRpBMQzBw

需求場景分析

需求場景

需求誘誘誘來了。。。數據產品妹妹想要統計單個短影片粒度的「點贊,播放,評論,分享,舉報」五類實時指標,並且匯總成 photo_id、1 分鐘時間粒度的實時影片消費寬表(即寬表欄位至少為:「photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp」)產出至實時大屏。

問題在於對同一個影片,五類影片消費行為的觸發機制以及上報時間是不同,也就決定了對實時處理來說五類行為日誌對應著五個不同的數據源。sql boy 們自然就想到了 join 操作將五類消費行為日誌合併,可是實時 join(cogroup) 真的那麼完美咩~,下文細談。

source 輸入以及特點

首先分析下需求中的 source 特點:

  • photo_id 粒度 play(播放)、like(點贊)、comment(評論)、share(分享)、negative(舉報)明細數據,「用戶播放(點贊、評論…)n 次,客戶端\服務端就會上傳 n 條播放(點贊、評論…)日誌至數據源」
  • 五類影片消費行為日誌的 source schema 都為:「photo_id + timestamp + 其他維度」

sink 輸出以及特點

sink 特點如下:

  • photo_id 粒度 play(播放)、like(點贊)、comment(評論)、share(分享)、negative(舉報)「1 分鐘級別窗口聚合數據」
  • 實時影片消費寬表 sink schema 為:「photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt +  minute_timestamp」

source、sink 樣例數據

source 數據:

photo_id timestamp user_id 說明
1 2020/10/3 11:30:33 3 播放
1 2020/10/3 11:30:33 4 播放
1 2020/10/3 11:30:33 5 播放
1 2020/10/3 11:30:33 4 點贊
2 2020/10/3 11:30:33 5 點贊
1 2020/10/3 11:30:33 5 評論

sink 數據:

photo_id timestamp play_cnt like_cnt comment_cnt
1 2020/10/3 11:30:00 3 1 1
2 2020/10/3 11:30:00 0 1 0

我們已經對數據源輸入和輸出有了完整的分析,那就瞧瞧有什麼方案可以實現上述需求吧。

 

實現方案

  • 方案1:「本小節 cogroup 方案」直接消費原始日誌數據,對五類不同的影片消費行為日誌使用 cogroup 或者 join 進行窗口聚合計算
  • 方案2:對五類不同的影片消費行為日誌分別單獨聚合計算出分鐘粒度指標數據,下游再對聚合好的指標數據按照 photo_id 進行合併
  • 方案3:「本小節 union 方案」既然數據源 schema 相同,直接對五類不同的影片消費行為日誌做 union 操作,在後續的窗口函數中對五類指標進行聚合計算。後文介紹 union 方案的設計過程

先上 cogroup 方案的示例程式碼。

cogroup

cogroup 實現示例如下,示例程式碼直接使用了處理時間(也可替換為事件時間~),因此對數據源的時間戳做了簡化(直接幹掉):

public class Cogroup {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Long -> photo_id 播放一次
        DataStream<Long> play = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 點贊一次
        DataStream<Long> like = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 評論一次
        DataStream<Long> comment = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 分享一次
        DataStream<Long> share = SourceFactory.getDataStream(xxx);
        // Long -> photo_id 舉報一次
        DataStream<Long> negative = SourceFactory.getDataStream(xxx);

        // Tuple3<Long, Long, Long> -> photo_id + play_cnt + like_cnt 播放和點贊的數據合併
        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
            .coGroup(like)
            .where(KeySelectorFactory.get(Function.identity()))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx1);

        // Tuple4<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt 播放、點贊、評論的數據合併
        DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt
            .coGroup(comment)
            .where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx2);

        // Tuple5<Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt 播放、點贊、評論、分享的數據合併
        DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment
            .coGroup(share)
            .where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0))
            .equalTo(KeySelectorFactory.get(Function.identity()))
            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
            .apply(xxx2);

        // Tuple7<Long, Long, Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp 播放、點贊、評論、分享、舉報的數據合併
        // 同上~
        DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***;

        env.execute();
    }
}

粗暴一想,上面這樣一搞不就結束了么,事情沒那麼簡單,我們來做一個詳細點的分析。

上述實現可能會存在的問題點

  • 「從 flink 消費到 play 數據源的一條數據到最終產出這條數據被聚合後的數據,整個過程的數據延遲 > 3 分鐘…」
  • 「如果數據源持續增加(比如添加其他影片消費操作數據源),則整個任務運算元變多,數據鏈路更長,任務穩定性會變差,產出數據延遲也會隨著窗口計算變多,延遲更久」

逆推鏈路

1 – 5 為逆推的整條鏈路。

  • 「1.五類指標的數據都在單個窗口中計算」
  • 「2.五類指標的窗口 model 相同」
  • 「3.keyby 中的 key 一致(photo_id)」
  • 「4.五類指標的數據源都為 photo_id 粒度,並且五類數據源的 model 都必須相同,並且可以做合併」
  • 「5.union 運算元可以對五類數據源做合併!!!」

話不多說直接上 union 方案程式碼。

public class Union {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Tuple2<Long, String> -> photo_id + "PLAY"標籤
        DataStream<Tuple2<Long, String>> play = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "LIKE"標籤
        DataStream<Tuple2<Long, String>> like = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "COMMENT"標籤
        DataStream<Tuple2<Long, String>> comment = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "SHARE"標籤
        DataStream<Tuple2<Long, String>> share = SourceFactory.getDataStream(xxx);
        // Tuple2<Long, String> -> photo_id + "NEGATIVE"標籤
        DataStream<Tuple2<Long, String>> negative = SourceFactory.getDataStream(xxx);

        // Tuple5<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + window_start_timestamp
        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
            .union(like)
            .union(comment)
            .union(share)
            .union(negative)
            .keyBy(KeySelectorFactory.get(i -> i.f0))
            .timeWindow(Time.seconds(60))
            .process(xxx);

        env.execute();
    }
}

可以發現,無論上游數據源怎樣進行變化,上述 union 方案中始終可以保持只有一個窗口運算元處理和計算數據,則可以解決之前列舉的數據延遲以及 flink 任務運算元過多的問題。

在數據源的 schema 相同(或者不同但經過處理之後可以 format 成相同格式)的情況下,或者處理邏輯相同的話,可以使用 union 進行邏輯簡化。

總結

本文首先介紹了需求場景,第二部分分析了使用 cogroup(案例程式碼)是如何解決此需求場景,再分析了此實現方案可能會存在一些問題,並引出了 union 解決方案的逆推和設計思路。在第三部分針對此場景使用 union 代替 cogroup 進行了一定程度上的優化。