Flink 實踐教程-進階(5):排序(亂序調整)

作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

本文將為您詳細介紹如何使用 Windowing TVF 配合聚合函數,實時調整亂序數據,經過聚合分析後存入 MySQL 中。

操作視頻

前置準備
創建流計算 Oceanus 集群
進入 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考 Oceanus 官方文檔 創建獨享集群 [2]。

創建消息隊列 CKafka
進入 CKafka 控制台 [3],點擊左上角【新建】,即可完成 CKafka 的創建,具體可參考 CKafka 創建實例 [4]。

創建 Topic:
進入 CKafka 實例,點擊【topic 管理】>【新建】,即可完成 Topic 的創建,具體可參考 CKafka 創建 Topic [5]。

數據準備:
進入同子網的 CVM 下,啟動 Kafka 客戶端,模擬發送數據,具體操作參見 運行 Kafka 客戶端 [6]。

啟動 Kafka 生產者命令

bash kafka-console-producer.sh –broker-list 10.0.0.29:9092 –topic oceanus_advanced5_input –producer.config ../config/producer.properties
1
2
// 按順序插入如下數據,注意這裡數據時間是亂序的
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:16″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:30″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:59″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:43″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:09″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:01″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:15″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:31:15″}
1
2
3
4
5
6
7
8
9
10
11
12
創建 MySQL 實例
進入 MySQL 控制台 [7],點擊【新建】。具體可參考官方文檔 創建 MySQL 實例 [8]。

— 建表語句
CREATE TABLE oceanus_advanced5_output (
window_start datetime NOT NULL,
window_end datetime NOT NULL,
num int(11) DEFAULT NULL,
PRIMARY KEY (window_start,window_end)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1
2
3
4
5
6
7
流計算 Oceanus 作業

  1. 創建 Source
    CREATE TABLE kafka_json_source_table (
    order_id VARCHAR,
    num INT,
    event_time TIMESTAMP(3),
    — 根據事件時間 event_time 設置 10s 的延遲水印
    WATERMARK FOR event_time AS event_time – INTERVAL ’10’ SECOND
    ) WITH (
    ‘connector’ = ‘kafka’,
    ‘topic’ = ‘oceanus_advanced5_input’, — 替換為您要消費的 Topic
    ‘scan.startup.mode’ = ‘latest-offset’, — 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一種
    ‘properties.bootstrap.servers’ = ‘10.0.0.29:9092’, — 替換為您的 Kafka 連接地址
    ‘properties.group.id’ = ‘testGroup’, — 必選參數, 一定要指定 Group ID
    ‘format’ = ‘json’,
    ‘json.fail-on-missing-field’ = ‘false’, — 如果設置為 false, 則遇到缺失字段不會報錯。
    ‘json.ignore-parse-errors’ = ‘true’ — 如果設置為 true,則忽略任何解析報錯。
    );
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
  2. 創建 Sink
    CREATE TABLE jdbc_upsert_sink_table (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    num INT,
    PRIMARY KEY(window_start,window_end) NOT ENFORCED
    ) WITH (
    ‘connector’ = ‘jdbc’,
    ‘url’ = ‘jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai’, — 請替換為您的實際 MySQL 連接參數
    ‘table-name’ = ‘oceanus_advanced5_output’, — 需要寫入的數據表
    ‘username’ = ‘root’, — 數據庫訪問的用戶名(需要提供 INSERT 權限)
    ‘password’ = ‘Tencent123$’, — 數據庫訪問的密碼
    ‘sink.buffer-flush.max-rows’ = ‘200’, — 批量輸出的條數
    ‘sink.buffer-flush.interval’ = ‘2s’ — 批量輸出的間隔
    );
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  3. 編寫業務 SQL
    INSERT INTO jdbc_upsert_sink_table
    SELECT
    window_start,window_end,SUM(num) AS num
    FROM TABLE(
    — Windowing TVF
    TUMBLE(TABLE kafka_json_source_table,DESCRIPTOR(event_time),INTERVAL ‘1’ MINUTES)
    ) GROUP BY window_start,window_end;
    1
    2
    3
    4
    5
    6
    7
  4. 查詢數據
    進入 MySQL 控制台 [7],單擊右側【登陸】快速登陸數據庫,選擇相應的庫表查詢數據。
    [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JKvqsDZ5-1640712788701)(MySQL數據查詢.png)]

筆者這裡設置的 10s 的延遲水印,可以看到在 2930、3031時間段的數據統計是正確,並沒有因為數據延時而出現漏統計的現象。31~32時間段的數據並沒有統計出來,這是因為我們最後一條數據時間是2021-12-22 14:31:15,其水印時間為2021-12-22 14:31:05,小於窗口關閉時間,導致這段時間窗口還未關閉、未計算。

總結
WARTERMARK是跟隨在每條數據上的一條特殊標籤,而且只增不減(可以相等)。WARTERMARK並不能影響數據出現在哪個窗口(本例中由event_time決定),其主要決定窗口是否關閉(當水印時間大於窗口結束時間時,窗口關閉並計算)。
如果數據延時過大,例如小時級別,可以配合allowedLateness算子合理性使用WARTERMARK,當達到水印結束時間時,窗口並不關閉,只進行計算操作,當時間到達allowedLateness算子設置的時間後,窗口才真正關閉,並在原先的基礎上再次進行計算。如在allowedLateness算子設置的時間後才達到的數據,我們可以使用sideOutputLateData算子將遲到的數據輸出到側輸出流進行計算。這裡需要注意allowedLateness和sideOutputLateData算子目前只能使用 Stream API 實現。
目前 flink 1.13 的 Windowing TVF 函數並不能單獨使用,需配合AGGREGATE、JOIN、TOPN使用。建議優先使用 Windowing TVF 實現窗口聚合等功能,因為 Windowing TVF 更符合 SQL 書寫規範,底層優化邏輯也更好。
參考鏈接
[1] Oceanus 控制台://console.cloud.tencent.com/oceanus/overview
[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 創建實例://cloud.tencent.com/document/product/597/54839
[5] Ckafka 創建 Topic://cloud.tencent.com/document/product/597/54854
[6] 運行 Kafka 客戶端://cloud.tencent.com/document/product/597/56840
[7] MySQL 控制台://console.cloud.tencent.com/cdb
[8] 創建 MySQL 實例://cloud.tencent.com/document/product/236/46433

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~