Flink 實踐教程:入門(1):零基礎用戶實現簡單 Flink 任務

作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

流計算 Oceanus 提供了便捷的控制台環境,方便用戶編寫 SQL 分析語句、ETL 作業或者上傳運行自定義 JAR 包,支援作業運維管理。

本文將為您詳細介紹如何使用 datagen 和 blackhole 連接器隨機產生和存儲數據。

 

前置準備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台,點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔創建獨享集群

 

流計算 Oceanus 作業

1. 創建 Source

-- Datagen Connector 可以隨機生成數據。Datagen Connector 適合用做測試數據源。
-- 參見 //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '1',              -- 每秒產生的數據條數
 'fields.user_id.kind' = 'sequence',   -- 有界序列(結束後自動停止輸出)
 'fields.user_id.start' = '1',         -- 序列的起始值
 'fields.user_id.end' = '10000',       -- 序列的終止值
 'fields.item_id.kind' = 'random',     -- 無界的隨機數
 'fields.item_id.min' = '1',           -- 隨機數的最小值
 'fields.item_id.max' = '1000',        -- 隨機數的最大值
 'fields.category_id.kind' = 'random', -- 無界的隨機數
 'fields.category_id.min' = '1',       -- 隨機數的最小值
 'fields.category_id.max' = '1000',    -- 隨機數的最大值
 'fields.behavior.length' = '5'        -- 隨機字元串的長度
);

2. 創建 Sink

-- 輸入到 Blackhole Sink 的數據, 會被全部丟棄。Blackhole Sink 適合做性能測試。
-- 參見 //ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/blackhole/

CREATE TABLE blackhole_sink (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH ('connector' = 'blackhole');

3. 編寫業務 SQL

INSERT INTO blackhole_sink
(
   SELECT user_id,
  item_id,
  category_id,
  behavior
   FROM random_source
);

4. 發布運行

點擊工具欄【語法檢查】進行 SQL 語法檢查,檢查無誤後點擊【保存】>【發布草稿】運行作業。

 

總結

Datagen Connector 連接器是一款用於生成隨機數據的 Connector,一般作為測試使用。  Sink 到 Blackhole 的數據會被丟棄,用戶無法查詢到其中的數據,此連接器一般用于于性能測試。 

 

 

關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站Get~