Flink 實踐教程-進階(5):排序(亂序調整)
- 2021 年 12 月 29 日
- 筆記
- Flink, 流計算 Oceanus
作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
本文將為您詳細介紹如何使用 Windowing TVF 配合聚合函數,實時調整亂序數據,經過聚合分析後存入 MySQL 中。
操作視頻
前置準備
創建流計算 Oceanus 集群
進入 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考 Oceanus 官方文檔 創建獨享集群 [2]。
創建消息隊列 CKafka
進入 CKafka 控制台 [3],點擊左上角【新建】,即可完成 CKafka 的創建,具體可參考 CKafka 創建實例 [4]。
創建 Topic:
進入 CKafka 實例,點擊【topic 管理】>【新建】,即可完成 Topic 的創建,具體可參考 CKafka 創建 Topic [5]。
數據準備:
進入同子網的 CVM 下,啟動 Kafka 客戶端,模擬發送數據,具體操作參見 運行 Kafka 客戶端 [6]。
啟動 Kafka 生產者命令
bash kafka-console-producer.sh –broker-list 10.0.0.29:9092 –topic oceanus_advanced5_input –producer.config ../config/producer.properties
1
2
// 按順序插入如下數據,注意這裡數據時間是亂序的
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:16″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:30″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:59″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:43″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:09″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:01″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:29:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:15″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:30:50″}
{“order_id”:”10000″,”num”:1,”event_time”:”2021-12-22 14:31:15″}
1
2
3
4
5
6
7
8
9
10
11
12
創建 MySQL 實例
進入 MySQL 控制台 [7],點擊【新建】。具體可參考官方文檔 創建 MySQL 實例 [8]。
— 建表語句
CREATE TABLE oceanus_advanced5_output
(
window_start
datetime NOT NULL,
window_end
datetime NOT NULL,
num
int(11) DEFAULT NULL,
PRIMARY KEY (window_start
,window_end
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1
2
3
4
5
6
7
流計算 Oceanus 作業
- 創建 Source
CREATE TABLEkafka_json_source_table
(
order_id
VARCHAR,
num
INT,
event_time
TIMESTAMP(3),
— 根據事件時間event_time
設置 10s 的延遲水印
WATERMARK FOR event_time AS event_time – INTERVAL ’10’ SECOND
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘oceanus_advanced5_input’, — 替換為您要消費的 Topic
‘scan.startup.mode’ = ‘latest-offset’, — 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一種
‘properties.bootstrap.servers’ = ‘10.0.0.29:9092’, — 替換為您的 Kafka 連接地址
‘properties.group.id’ = ‘testGroup’, — 必選參數, 一定要指定 Group ID
‘format’ = ‘json’,
‘json.fail-on-missing-field’ = ‘false’, — 如果設置為 false, 則遇到缺失字段不會報錯。
‘json.ignore-parse-errors’ = ‘true’ — 如果設置為 true,則忽略任何解析報錯。
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 - 創建 Sink
CREATE TABLEjdbc_upsert_sink_table
(
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
num INT,
PRIMARY KEY(window_start,window_end) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai’, — 請替換為您的實際 MySQL 連接參數
‘table-name’ = ‘oceanus_advanced5_output’, — 需要寫入的數據表
‘username’ = ‘root’, — 數據庫訪問的用戶名(需要提供 INSERT 權限)
‘password’ = ‘Tencent123$’, — 數據庫訪問的密碼
‘sink.buffer-flush.max-rows’ = ‘200’, — 批量輸出的條數
‘sink.buffer-flush.interval’ = ‘2s’ — 批量輸出的間隔
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14 - 編寫業務 SQL
INSERT INTOjdbc_upsert_sink_table
SELECT
window_start,window_end,SUM(num) AS num
FROM TABLE(
— Windowing TVF
TUMBLE(TABLEkafka_json_source_table
,DESCRIPTOR(event_time),INTERVAL ‘1’ MINUTES)
) GROUP BY window_start,window_end;
1
2
3
4
5
6
7 - 查詢數據
進入 MySQL 控制台 [7],單擊右側【登陸】快速登陸數據庫,選擇相應的庫表查詢數據。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JKvqsDZ5-1640712788701)(MySQL數據查詢.png)]
筆者這裡設置的 10s 的延遲水印,可以看到在 2930、3031時間段的數據統計是正確,並沒有因為數據延時而出現漏統計的現象。31~32時間段的數據並沒有統計出來,這是因為我們最後一條數據時間是2021-12-22 14:31:15,其水印時間為2021-12-22 14:31:05,小於窗口關閉時間,導致這段時間窗口還未關閉、未計算。
總結
WARTERMARK是跟隨在每條數據上的一條特殊標籤,而且只增不減(可以相等)。WARTERMARK並不能影響數據出現在哪個窗口(本例中由event_time決定),其主要決定窗口是否關閉(當水印時間大於窗口結束時間時,窗口關閉並計算)。
如果數據延時過大,例如小時級別,可以配合allowedLateness算子合理性使用WARTERMARK,當達到水印結束時間時,窗口並不關閉,只進行計算操作,當時間到達allowedLateness算子設置的時間後,窗口才真正關閉,並在原先的基礎上再次進行計算。如在allowedLateness算子設置的時間後才達到的數據,我們可以使用sideOutputLateData算子將遲到的數據輸出到側輸出流進行計算。這裡需要注意allowedLateness和sideOutputLateData算子目前只能使用 Stream API 實現。
目前 flink 1.13 的 Windowing TVF 函數並不能單獨使用,需配合AGGREGATE、JOIN、TOPN使用。建議優先使用 Windowing TVF 實現窗口聚合等功能,因為 Windowing TVF 更符合 SQL 書寫規範,底層優化邏輯也更好。
參考鏈接
[1] Oceanus 控制台://console.cloud.tencent.com/oceanus/overview
[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 創建實例://cloud.tencent.com/document/product/597/54839
[5] Ckafka 創建 Topic://cloud.tencent.com/document/product/597/54854
[6] 運行 Kafka 客戶端://cloud.tencent.com/document/product/597/56840
[7] MySQL 控制台://console.cloud.tencent.com/cdb
[8] 創建 MySQL 實例://cloud.tencent.com/document/product/236/46433
流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓
關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~