Flink 實踐教程-入門(8): 簡單 ETL 作業

作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

本示例使用流計算 Oceanus 平台的 ETL 功能,將 PostgreSQL 數據取出,經過時間轉換函數處理後存入 PostgreSQL 中。用戶無需編寫 SQL 程式碼,只用在介面上進行簡單的點擊操作即可創建流計算 Oceanus ETL 作業。      

 

操作影片

前置準備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。

創建 PostgreSQL 實例

進入 PostgreSQL 控制台 [3],點擊左上角【新建】創建實例,具體參考 創建 PostgreSQL 實例 [4]。進入實例資料庫,創建表。

-- 用於 Source
CREATE TABLE public.oceanus8_output (
id         INT,
time_one   TIMESTAMP,
PRIMARY    KEY(id)
);

-- 手動插入數據
INSERT INTO public.oceanus8_output VALUES (1,'2020-10-01 18:00:00');
INSERT INTO public.oceanus8_output VALUES (2,'2021-10-01 18:30:24');

-- 用於 Sink
CREATE TABLE public.oceanus8_input (
id           INT,
transf_one   VARCHAR(50),
transf_two   TIMESTAMP,
const_four   INT,
PRIMARY      KEY(id)
);

  

筆者這裡使用 DBeaver 進行外網連接,更多連接方式參考官網文檔 連接 PostgreSQL 實例 [5]

 

流計算 Oceanus 作業

進入流計算 Oceanus 控制台 [1],點擊左上角【新建】創建 ETL 作業,點擊【開發調試】進入作業編輯頁面。ETL 作業源端可以是 MySQL、PostgreSQL,目的端可以是 MySQL、PostgreSQL、ClickHouse 和 Elasticsearch。

1. 創建 Source

單擊【數據源表】右側【添加】按鈕,選擇 PostgreSQL ,選擇並填寫資料庫表相關的資訊。

2. 創建 Sink

單擊【數據目的表】右側【添加】按鈕,選擇 PostgreSQL ,選擇並填寫資料庫表相關的資訊。

3. 映射欄位

編寫需要創建映射欄位的業務邏輯。這裡使用 DATA_FORMAT 函數將 time_one 欄位類型由 TIMESTAMP 映射為 STRING,使用 TIMESTAMPADD 函數將 time_one 欄位增加一周,並將常量 1000 存入 const_str 欄位。

 

ETL 作業開發詳見流計算 Oceanus 官方文檔 ETL 開發指南 [6]。

 

添加數據源表和目的表後,可配置欄位映射。欄位映射分為原欄位映射和新增欄位映射兩個部分。

原欄位映射

在左側的數據源表中可以勾選本次 ETL 作業需要從數據源表抽取的數據欄位,並在右側選擇要載入進目的表的對應的映射欄位名稱。這樣在數據源表中的數據就會複製載入到目的表中。

新增欄位映射

欄位生成方式有計算欄位和常量欄位兩種。

  • 計算欄位可以對從數據源表抽取出來的欄位數據進行 內置函數 數值轉換或者計算。

  • 常量欄位可以輸入一個自定義常量欄位到目的源表相應的欄位中。

欄位取值
  • 計算欄位:欄位取值可以輸入欄位值或者表達式,對每個滿足的輸入源數據進行表達式計算。將計算結果返回到數據目的表所選的映射欄位中。

  • 常量欄位:欄位取值可以輸入字元串或者數字(輸入類型與目的表類型要一致),這個常量欄位取值將會載入到每一條數據目的表所選的映射欄位中。

 

總結

流計算 Oceanus ETL 作業最簡化了用戶操作,開發人員甚至無需了解程式語言,只需要選擇數據源表和目的表,並根據業務邏輯完成欄位映射的配置,花費幾分鐘即可輕鬆啟動 ETL 作業。 

計算欄位:可以對從數據源表抽取出來的欄位數據進行 內置函數 [7] 數值轉換或者計算。  

常量欄位:可以輸入一個自定義常量欄位到目的源表相應的欄位中。 

 

參考鏈接

[1] 流計算 Oceanus 控制台://console.cloud.tencent.com/oceanus/overview  

[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298  

[3] PostgreSQL 控制台://console.cloud.tencent.com/postgres/index  

[4] 創建 PostgreSQL 實例://cloud.tencent.com/document/product/409/56961  

[5] 連接 PostgreSQL 實例://cloud.tencent.com/document/product/409/40429  

[6] ETL 開發指南://cloud.tencent.com/document/product/849/59839  

[7] 內置函數://cloud.tencent.com/document/product/849/18083  

 

 

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓