Flink 實踐教程-入門(6):讀取 PG 數據寫入 ClickHouse

作者:騰訊雲流計算 Oceanus 團隊

流計算 Oceanus 簡介  

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。
本文將向您詳細介紹如何獲取 PostgreSQL 表數據,並使用字元串函數進行轉換,最後將數據輸出到 ClickHouse 中。

 


操作影片

前置準備

創建流計算 Oceanus 集群

進入流計算 Oceanus 控制台 [1],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔 創建獨享集群 [2]。

創建 PostgreSQL 實例

進入 PostgreSQL 控制台 [3],點擊左上角【新建】創建實例,具體參考 創建 PostgreSQL 實例 [4]。 

數據準備:

進入實例資料庫,創建 test1 表,並手動插入數據。

-- 建表語句create table public.test1 (    id INT,    str_one VARCHAR(50),    str_two VARCHAR(50),    str_thr VARCHAR(50),    PRIMARY key(id));-- 插入語句INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1');INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2');INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');

筆者這裡使用 DBeaver 進行外網連接,更多連接方式參考官網文檔 連接 PostgreSQL 實例 [5]

創建 ClickHouse 集群

進入 ClickHouse 控制台 [6],點擊左上角【新建集群】,完成 ClickHouse 集群創建,具體可參考 ClickHouse 快速入門 [7]。創建 ClickHouse 表:  登陸 ClickHouse 集群(登入方式參考 ClickHouse 快速入門 [7]),並建表。

CREATE TABLE default.pg_to_ck on cluster default_cluster (    id  Int8,    str_one String,    str_two String,    str_thr String,    Sign Int8 )ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign)ORDER BY (id);

註:流計算 Oceanus 集群、PostgreSQL 實例、ClickHouse 集群需在同一 VPC 下。

 

流計算 Oceanus 作業

1. 創建 Source

-- PostgreSQL CDC Source。CREATE TABLE PostgreSourceTable (  id INT,  str_one VARCHAR,  str_two VARCHAR,  str_thr VARCHAR,  PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的資料庫表定義了主鍵, 則這裡也需要定義) WITH (  'connector' = 'postgres-cdc',  -- 必須為 'postgres-cdc'  'hostname' = '10.0.0.236',     -- 資料庫的 IP  'port' = '5432',               -- 資料庫的訪問埠  'username' = 'root',           -- 資料庫訪問使用的用戶名(需要提供 REPLICATION 許可權, 日誌級別必須大於等於 logical, 且設置後需要重啟實例)  'password' = 'xxxxxxxxxxx',    -- 資料庫訪問使用的密碼  'database-name' = 'postgres',  -- 需要同步的資料庫名  'schema-name' = 'public',      -- 需要同步的資料庫模式 (Schema)  'table-name' = 'test1'         -- 需要同步的數據表名);

2. 創建 Sink

-- ClickHouse Sink (不完全支援upsert,詳見說明文檔)。配合 flink-connector-clickhouse 使用。CREATE TABLE clickhouse_sink (  id INT,  str_one VARCHAR,  str_two VARCHAR,  str_thr VARCHAR,  PRIMARY KEY (id) NOT ENFORCED          -- 如果要同步的資料庫表定義了主鍵, 則這裡也需要定義) WITH (  'connector' = 'clickhouse',              -- connector 類型為 clickhouse  'url' = 'clickhouse://10.0.0.178:8123',  -- 指定資料庫鏈接 url  'database-name' = 'default',             -- 需要寫入的 clickhouse 庫名  'table-name' = 'pg_to_ck',               -- 需要寫入的 clickhouse 表名  'table.collapsing.field' = 'Sign'        -- 採用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列欄位的名稱);

3. 編寫業務 SQL

INSERT INTO clickhouse_sinkSELECT   id,--INITCAP:將 str_one 中的單詞轉為大寫開頭,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。  INITCAP(str_one)    AS str_one,--TO_BASE64:將 string 表示的字元串編碼為 Base64 字元串。  TO_BASE64(str_two)  AS str_two,--REPLACE:將 string1 字元串中所有的 string2 替換為 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。  REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr FROM PostgreSourceTable;

這裡我們使用 Flink 1.13 集群,舊版 Flink 集群需選擇相應的內置 Connector

 

總結

  1. 使用 Postgres-CDC 連接器:

  • 用於同步的 Postgres 用戶至少需要開啟 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 許可權。可以進入 PostgreSQL 資料庫進行授權操作。

CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON DATABASE database_name TO debezium_user;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
  • 日誌級別必須大於等於 logical, 且設置後需要重啟實例。進入資料庫實例,單擊【參數設置】,單擊【WAL】,修改【wal_level】的【參數運行值】為 “logical”。修改成功後點擊右上角【重啟】。

  1. 更多字元串操作函數請參考流計算 Oceanus 官方文檔 字元串函數[8]。

     

參考鏈接

[1] 流計算 Oceanus 控制台://console.cloud.tencent.com/oceanus/overview  

[2] 創建獨享集群://cloud.tencent.com/document/product/849/48298  

[3] PostgreSQL 控制台://console.cloud.tencent.com/postgres/index  

[4] 創建 PostgreSQL 實例://cloud.tencent.com/document/product/409/56961  

[5] 連接 PostgreSQL 實例://cloud.tencent.com/document/product/409/40429  

[6] ClickHouse 控制台://console.cloud.tencent.com/cdwch?region=ap-guangzhou  

[7] ClickHouse 快速入門://cloud.tencent.com/document/product/1299/49824  

[8] 流計算 Oceanus 字元串函數://cloud.tencent.com/document/product/849/18073

 

關注「騰訊雲大數據」公眾號,技術交流、最新活動、服務專享一站 Get~

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓