Flink 實踐教程-進階(2):複雜格式數據抽取
- 2021 年 12 月 4 日
- 筆記
- Flink, 流計算 Oceanus
作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。本文將為您詳細介紹如何實時獲取 CKafka 中的 JSON 格式數據,經過數據抽取、平鋪轉換後存入 MySQL 中。
前置準備
創建流計算 Oceanus 集群
進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。
創建消息隊列 CKafka
進入 CKafka 控制台 [3],點擊左上角【新建】,即可完成 CKafka 的創建,具體可參考 CKafka 創建實例 [4]。創建 Topic: 進入 CKafka 實例,點擊【topic 管理】>【新建】,即可完成 Topic 的創建,具體可參考 CKafka 創建 Topic [5]。數據準備: 進入同子網的 CVM 下,啟動 Kafka 客戶端,模擬發送數據,具體操作參見 運行 Kafka 客戶端 [6]。
// 數據格式
{
"id": 1,
"message": "流計算 Oceanus 1元限量秒殺活動",
"userInfo": {
"name": "張三",
"phone": ["12345678910", "8547942"]
},
"companyInfo": {
"name": "Tencent",
"address": "深圳市騰訊大廈"
}
}
創建 MySQL 實例
進入 MySQL 控制台 [7],點擊【新建】。具體可參考官方文檔 創建 MySQL 實例 [8]。
-- 建表語句
CREATE TABLE `oceanus_advanced2` (
`id` int (100) NOT NULL,
`message` varchar (100) NULL DEFAULT '',
`name` varchar (50) NULL DEFAULT '',
`phone` varchar (11) NULL DEFAULT '',
`company_name` varchar (100) NULL DEFAULT '',
`company_address` varchar (100) NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE = innodb
流計算 Oceanus 作業
1. 創建 Source
CREATE TABLE `kafka_json_source_table` (
`id` INT,
`message` STRING,
`userInfo` ROW<`name` STRING,`phone` ARRAY<STRING>>, -- 採用 ROW 嵌套 ARRAY 格式接收 JSON 欄位
`companyInfo` MAP<STRING,STRING> -- 採用 MAP 格式接收 JSON 欄位
) WITH (
'connector' = 'kafka',
'topic' = 'oceanus_advanced2', -- 替換為您要消費的 Topic
'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp 的任何一種
'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替換為您的 Kafka 連接地址
'properties.group.id' = 'testGroup', -- 必選參數, 一定要指定 Group ID
'format' = 'json', -- 定義 JSON 格式,部分其他格式可能不支援抽取平鋪
'json.fail-on-missing-field' = 'false', -- 如果設置為 false, 則遇到缺失欄位不會報錯。
'json.ignore-parse-errors' = 'true' -- 如果設置為 true,則忽略任何解析報錯。
);
2. 創建 Sink
CREATE TABLE `jdbc_upsert_sink_table` (
`id` INT,
`message` STRING,
`name` STRING,
`phone` STRING,
`company_name` STRING,
`company_address` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 請替換為您的實際 MySQL 連接參數
'table-name' = 'oceanus_advanced2', -- 需要寫入的數據表
'username' = 'root', -- 資料庫訪問的用戶名(需要提供 INSERT 許可權)
'password' = 'Tencent123$', -- 資料庫訪問的密碼
'sink.buffer-flush.max-rows' = '200', -- 批量輸出的條數
'sink.buffer-flush.interval' = '2s' -- 批量輸出的間隔
);
3. 編寫業務 SQL
INSERT INTO `jdbc_upsert_sink_table`
SELECT
id AS id,
message AS message,
userInfo.name AS name, -- 獲取 Row 中成員採用.成員的方式
userInfo.phone[1] AS phone, -- 獲取 Array 中成員採用 [數組下標] 的方式
companyInfo['name'] AS company_name, -- 獲取 Map 中成員採用 ['屬性名'] 的方式
companyInfo['address'] AS company_address
FROM `kafka_json_source_table`;
新版 Flink 1.13 集群無需用戶選擇內置 Connector,平台自動匹配獲取
總結
本文詳細介紹了如何通過 SQL 作業定義和獲取 MAP、ARRAY、ROW 類型數據。更多內置運算符和函數請參考流計算 Oceanus 官方文檔 [9]。
參考鏈接
[1]流計算 Oceanus 控制台://console.cloud.tencent.com/oceanus/overview
[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 創建實例://cloud.tencent.com/document/product/597/54839
[5] Ckafka 創建 Topic://cloud.tencent.com/document/product/597/54854
[6] 運行 Kafka 客戶端://cloud.tencent.com/document/product/597/56840
[7] MySQL 控制台://console.cloud.tencent.com/cdb
[8] 創建 MySQL 實例://cloud.tencent.com/document/product/236/46433
[9] 內置運算符和函數://cloud.tencent.com/document/product/849/18083
關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~