Flink 實踐教程:入門(1):零基礎用戶實現簡單 Flink 任務
- 2021 年 10 月 31 日
- 筆記
- 流計算 Oceanus
作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
流計算 Oceanus 提供了便捷的控制台環境,方便用戶編寫 SQL 分析語句、ETL 作業或者上傳運行自定義 JAR 包,支援作業運維管理。
本文將為您詳細介紹如何使用 datagen 和 blackhole 連接器隨機產生和存儲數據。
前置準備
創建流計算 Oceanus 集群
進入流計算 Oceanus 控制台,點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔創建獨享集群。
流計算 Oceanus 作業
1. 創建 Source
-- Datagen Connector 可以隨機生成數據。Datagen Connector 適合用做測試數據源。
-- 參見 //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1', -- 每秒產生的數據條數
'fields.user_id.kind' = 'sequence', -- 有界序列(結束後自動停止輸出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的終止值
'fields.item_id.kind' = 'random', -- 無界的隨機數
'fields.item_id.min' = '1', -- 隨機數的最小值
'fields.item_id.max' = '1000', -- 隨機數的最大值
'fields.category_id.kind' = 'random', -- 無界的隨機數
'fields.category_id.min' = '1', -- 隨機數的最小值
'fields.category_id.max' = '1000', -- 隨機數的最大值
'fields.behavior.length' = '5' -- 隨機字元串的長度
);
2. 創建 Sink
-- 輸入到 Blackhole Sink 的數據, 會被全部丟棄。Blackhole Sink 適合做性能測試。
-- 參見 //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/
CREATE TABLE blackhole_sink (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH ('connector' = 'blackhole');
3. 編寫業務 SQL
INSERT INTO blackhole_sink
(
SELECT user_id,
item_id,
category_id,
behavior
FROM random_source
);
4. 發布運行
點擊工具欄【語法檢查】進行 SQL 語法檢查,檢查無誤後點擊【保存】>【發布草稿】運行作業。
總結
Datagen Connector
連接器是一款用於生成隨機數據的 Connector,一般作為測試使用。 Sink 到 Blackhole
的數據會被丟棄,用戶無法查詢到其中的數據,此連接器一般用于于性能測試。
關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~