Flink 實踐教程-入門(6):讀取 PG 數據寫入 ClickHouse
- 2021 年 11 月 14 日
- 筆記
- Flink, 流計算 Oceanus
作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
本文將向您詳細介紹如何獲取 PostgreSQL 表數據,並使用字元串函數進行轉換,最後將數據輸出到 ClickHouse 中。
操作影片
前置準備
創建流計算 Oceanus 集群
進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。
創建 PostgreSQL 實例
進入 PostgreSQL 控制台 [3],點擊左上角【新建】創建實例,具體參考 創建 PostgreSQL 實例 [4]。
數據準備:
進入實例資料庫,創建 test1 表,並手動插入數據。
-- 建表語句
create table public.test1 (
id INT,
str_one VARCHAR(50),
str_two VARCHAR(50),
str_thr VARCHAR(50),
PRIMARY key(id)
);
-- 插入語句
INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1');
INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2');
INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');
筆者這裡使用 DBeaver 進行外網連接,更多連接方式參考官網文檔 連接 PostgreSQL 實例 [5]
創建 ClickHouse 集群
進入 ClickHouse 控制台 [6],點擊左上角【新建集群】,完成 ClickHouse 集群創建,具體可參考 ClickHouse 快速入門 [7]。創建 ClickHouse 表: 登陸 ClickHouse 集群(登入方式參考 ClickHouse 快速入門 [7]),並建表。
CREATE TABLE default.pg_to_ck on cluster default_cluster (
id Int8,
str_one String,
str_two String,
str_thr String,
Sign Int8 )
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign)
ORDER BY (id);
註:流計算 Oceanus 集群、PostgreSQL 實例、ClickHouse 集群需在同一 VPC 下。
流計算 Oceanus 作業
1. 創建 Source
-- PostgreSQL CDC Source。
CREATE TABLE PostgreSourceTable (
id INT,
str_one VARCHAR,
str_two VARCHAR,
str_thr VARCHAR,
PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的資料庫表定義了主鍵, 則這裡也需要定義
) WITH (
'connector' = 'postgres-cdc', -- 必須為 'postgres-cdc'
'hostname' = '10.0.0.236', -- 資料庫的 IP
'port' = '5432', -- 資料庫的訪問埠
'username' = 'root', -- 資料庫訪問使用的用戶名(需要提供 REPLICATION 許可權, 日誌級別必須大於等於 logical, 且設置後需要重啟實例)
'password' = 'xxxxxxxxxxx', -- 資料庫訪問使用的密碼
'database-name' = 'postgres', -- 需要同步的資料庫名
'schema-name' = 'public', -- 需要同步的資料庫模式 (Schema)
'table-name' = 'test1' -- 需要同步的數據表名
);
2. 創建 Sink
-- ClickHouse Sink (不完全支援upsert,詳見說明文檔)。配合 flink-connector-clickhouse 使用。
CREATE TABLE clickhouse_sink (
id INT,
str_one VARCHAR,
str_two VARCHAR,
str_thr VARCHAR,
PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的資料庫表定義了主鍵, 則這裡也需要定義
) WITH (
'connector' = 'clickhouse', -- connector 類型為 clickhouse
'url' = 'clickhouse://10.0.0.178:8123', -- 指定資料庫鏈接 url
'database-name' = 'default', -- 需要寫入的 clickhouse 庫名
'table-name' = 'pg_to_ck', -- 需要寫入的 clickhouse 表名
'table.collapsing.field' = 'Sign' -- 採用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列欄位的名稱
);
3. 編寫業務 SQL
INSERT INTO clickhouse_sink
SELECT
id,
--INITCAP:將 str_one 中的單詞轉為大寫開頭,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。
INITCAP(str_one) AS str_one,
--TO_BASE64:將 string 表示的字元串編碼為 Base64 字元串。
TO_BASE64(str_two) AS str_two,
--REPLACE:將 string1 字元串中所有的 string2 替換為 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。
REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr
FROM PostgreSourceTable;
這裡我們使用 Flink 1.13 集群,舊版 Flink 集群需選擇相應的內置 Connector
總結
-
使用 Postgres-CDC 連接器:
-
用於同步的 Postgres 用戶至少需要開啟 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 許可權。可以進入 PostgreSQL 資料庫進行授權操作。
CREATE ROLE debezium_user REPLICATION LOGIN;
GRANT USAGE ON DATABASE database_name TO debezium_user;
GRANT USAGE ON SCHEMA schema_name TO debezium_user;
GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
-
日誌級別必須大於等於 logical, 且設置後需要重啟實例。進入資料庫實例,單擊【參數設置】,單擊【WAL】,修改【wal_level】的【參數運行值】為 “logical”。修改成功後點擊右上角【重啟】。
-
更多字元串操作函數請參考流計算 Oceanus 官方文檔 字元串函數[8]。
參考鏈接
[1] 流計算 Oceanus 控制台://console.cloud.tencent.com/oceanus/overview
[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298
[3] PostgreSQL 控制台://console.cloud.tencent.com/postgres/index
[4] 創建 PostgreSQL 實例://cloud.tencent.com/document/product/409/56961
[5] 連接 PostgreSQL 實例://cloud.tencent.com/document/product/409/40429
[6] ClickHouse 控制台://console.cloud.tencent.com/cdwch?region=ap-guangzhou
[7] ClickHouse 快速入門://cloud.tencent.com/document/product/1299/49824
[8] 流計算 Oceanus 字元串函數://cloud.tencent.com/document/product/849/18073
關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站 Get~