Flink 實踐教程-進階(2):複雜格式數據抽取

作者:騰訊雲流計算 Oceanus 團隊

 

流計算 Oceanus 簡介

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。本文將為您詳細介紹如何實時獲取 CKafka 中的 JSON 格式數據,經過數據抽取、平鋪轉換後存入 MySQL 中。

操作影片

前置準備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。

創建消息隊列 CKafka

進入 CKafka 控制台 [3],點擊左上角【新建】,即可完成 CKafka 的創建,具體可參考 CKafka 創建實例 [4]。創建 Topic:  進入 CKafka 實例,點擊【topic 管理】>【新建】,即可完成 Topic 的創建,具體可參考 CKafka 創建 Topic [5]。數據準備:  進入同子網的 CVM 下,啟動 Kafka 客戶端,模擬發送數據,具體操作參見 運行 Kafka 客戶端 [6]。

// 數據格式{  "id": 1,  "message": "流計算 Oceanus 1元限量秒殺活動",  "userInfo": {      "name": "張三",      "phone": ["12345678910", "8547942"]      },  "companyInfo": {      "name": "Tencent",      "address": "深圳市騰訊大廈"      }}

 

創建 MySQL 實例

進入 MySQL 控制台 [7],點擊【新建】。具體可參考官方文檔 創建 MySQL 實例 [8]。

-- 建表語句CREATE TABLE `oceanus_advanced2` (  `id`              int (100) NOT NULL,  `message`         varchar (100) NULL DEFAULT '',  `name`            varchar (50)  NULL DEFAULT '',  `phone`           varchar (11)  NULL DEFAULT '',  `company_name`    varchar (100) NULL DEFAULT '',  `company_address` varchar (100) NULL DEFAULT '',  PRIMARY KEY (`id`)) ENGINE = innodb

 

流計算 Oceanus 作業

1. 創建 Source

CREATE TABLE `kafka_json_source_table` (    `id`             INT,    `message`        STRING,    `userInfo`       ROW<`name` STRING,`phone` ARRAY<STRING>>,  -- 採用 ROW 嵌套 ARRAY 格式接收 JSON 欄位    `companyInfo`    MAP<STRING,STRING>    -- 採用 MAP 格式接收 JSON 欄位) WITH (  'connector' = 'kafka',  'topic' = 'oceanus_advanced2',                      -- 替換為您要消費的 Topic  'scan.startup.mode' = 'earliest-offset',            -- 可以是 latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp 的任何一種  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替換為您的 Kafka 連接地址  'properties.group.id' = 'testGroup',                -- 必選參數, 一定要指定 Group ID  'format' = 'json',                                  -- 定義 JSON 格式,部分其他格式可能不支援抽取平鋪  'json.fail-on-missing-field' = 'false',             -- 如果設置為 false, 則遇到缺失欄位不會報錯。  'json.ignore-parse-errors' = 'true'                 -- 如果設置為 true,則忽略任何解析報錯。);

2. 創建 Sink

CREATE TABLE `jdbc_upsert_sink_table` (    `id`                INT,    `message`           STRING,    `name`              STRING,    `phone`             STRING,    `company_name`      STRING,    `company_address`   STRING) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',         -- 請替換為您的實際 MySQL 連接參數    'table-name' = 'oceanus_advanced2',    -- 需要寫入的數據表    'username' = 'root',                   -- 資料庫訪問的用戶名(需要提供 INSERT 許可權)    'password' = 'Tencent123$',            -- 資料庫訪問的密碼    'sink.buffer-flush.max-rows' = '200',  -- 批量輸出的條數    'sink.buffer-flush.interval' = '2s'    -- 批量輸出的間隔);

3. 編寫業務 SQL

INSERT INTO `jdbc_upsert_sink_table`SELECTid                        AS  id,message                   AS  message,userInfo.name             AS  name,              -- 獲取 Row 中成員採用.成員的方式userInfo.phone[1]         AS  phone,             -- 獲取 Array 中成員採用 [數組下標] 的方式companyInfo['name']       AS  company_name,      -- 獲取 Map 中成員採用 ['屬性名'] 的方式companyInfo['address']    AS  company_addressFROM `kafka_json_source_table`;

新版 Flink 1.13 集群無需用戶選擇內置 Connector,平台自動匹配獲取

 

總結

本文詳細介紹了如何通過 SQL 作業定義和獲取 MAP、ARRAY、ROW 類型數據。更多內置運算符和函數請參考流計算 Oceanus 官方文檔 [9]。

參考鏈接

[1]流計算 Oceanus 控制台://console.cloud.tencent.com/oceanus/overview 

[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298 

[3] CKafka 控制台://console.cloud.tencent.com/ckafka/index?rid=1  

[4] CKafka 創建實例://cloud.tencent.com/document/product/597/54839  

[5] Ckafka 創建 Topic://cloud.tencent.com/document/product/597/54854  

[6] 運行 Kafka 客戶端://cloud.tencent.com/document/product/597/56840  

[7] MySQL 控制台://console.cloud.tencent.com/cdb  

[8] 創建 MySQL 實例://cloud.tencent.com/document/product/236/46433  

[9] 內置運算符和函數://cloud.tencent.com/document/product/849/18083

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

 

在這裡插入圖片描述

 

關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~