Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

  • 2019 年 10 月 5 日
  • 筆記

來源:ververica.cn

作者:伍翀(雲邪)

Apache Flink Committer,阿里巴巴 SQL 引擎技術專家

北京理工大學碩士畢業,2015 年加入阿里巴巴,參與阿里巴巴實時計算引擎 JStorm 的開發與設計。2016 年開始從事阿里新一代實時計算引擎 Blink SQL 的開發與優化,並活躍於 Flink 社區,於2017年2月成為ApacheFlink Committer,是中國早期 Flink Committer 之一。目前主要專註於分散式處理和實時計算,熱愛開源,熱愛分享。

本文來自雲邪的部落格

本文衍生於伍翀(雲邪)在2019年8月31日 「Apache Kafka × Apache Flink Meetup 深圳站」上的分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。會後許多小夥伴對最後演示環節的 Demo 程式碼非常感興趣,迫不及待地想嘗試下,所以寫了這篇文章分享下這份程式碼。希望對於 Flink SQL 的初學者能有所幫助。

演示程式碼已經開源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

這份程式碼主要由兩部分組成:

1) 能用來提交 SQL 文件的 SqlSubmit 實現。

2) 用於演示的 SQL 示例、Kafka 啟動停止腳本、 一份測試數據集、Kafka 數據源生成器。

通過本實戰,你將學到:

  1. 如何使用 Blink Planner
  2. 一個簡單的 SqlSubmit 是如何實現的
  3. 如何用 DDL 創建一個 Kafka 源表和 MySQL 結果表
  4. 運行一個從 Kafka 讀取數據,計算 PVUV,並寫入 MySQL 的作業
  5. 設置調優參數,觀察對作業的影響

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支援處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。後來想想,也挺好的,可以讓聽眾同時了解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...)。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心程式碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()          .useBlinkPlanner()          .inStreamingMode()          .build();  // 創建一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式  TableEnvironment tEnv = TableEnvironment.create(settings);  // 讀取 SQL 文件  List<String> sql = Files.readAllLines(path);  // 通過正則表達式匹配前綴,來區分不同的 SQL 語句  List<SqlCommandCall> calls = SqlCommandParser.parse(sql);  // 根據不同的 SQL 語句,調用 TableEnvironment 執行  for (SqlCommandCall call : calls) {    switch (call.command) {      case SET:        String key = call.operands[0];        String value = call.operands[1];        // 設置參數        tEnv.getConfig().getConfiguration().setString(key, value);        break;      case CREATE_TABLE:        String ddl = call.operands[0];        tEnv.sqlUpdate(ddl);        break;      case INSERT_INTO:        String dml = call.operands[0];        tEnv.sqlUpdate(dml);        break;      default:        throw new RuntimeException("Unsupported command: " + call.command);    }  }  // 提交作業  tEnv.execute("SQL Job");

使用 DDL 連接 Kafka 源表

在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自阿里雲天池公開數據集,特別鳴謝),位於 src/main/resources/user_behavior.log。數據以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}  {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了數據源後,我們就可以用 DDL 去創建並連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。

CREATE TABLE user_log (      user_id VARCHAR,      item_id VARCHAR,      category_id VARCHAR,      behavior VARCHAR,      ts TIMESTAMP  ) WITH (      'connector.type' = 'kafka', -- 使用 kafka connector      'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本      'connector.topic' = 'user_behavior',  -- kafka topic      'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取      'connector.properties.0.key' = 'zookeeper.connect',  -- 連接資訊      'connector.properties.0.value' = 'localhost:2181',      'connector.properties.1.key' = 'bootstrap.servers',      'connector.properties.1.value' = 'localhost:9092',      'update-mode' = 'append',      'format.type' = 'json',  -- 數據源格式為 json      'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則  )

註:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。

使用 DDL 連接 MySQL 結果表

連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (      dt VARCHAR,      pv BIGINT,      uv BIGINT  ) WITH (      'connector.type' = 'jdbc', -- 使用 jdbc connector      'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url      'connector.table' = 'pvuv_sink', -- 表名      'connector.username' = 'root', -- 用戶名      'connector.password' = '123456', -- 密碼      'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條  )

PV UV 計算

假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink  SELECT    DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,    COUNT(*) AS pv,    COUNT(DISTINCT user_id) AS uv  FROM user_log  GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成「年月日小時」的字元串格式,並根據這個字元串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

註:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地集群:用來運行 Flink SQL 任務。
  • Kafka 本地集群:用來作為數據源。
  • MySQL 資料庫:用來作為結果表。

Flink 本地集群安裝

  1. 下載 Flink 1.9.0 安裝包並解壓:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
  2. 下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。
  3. 將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
  4. 在 flink-1.9.0 目錄下執行 ./bin/start-cluster.sh,啟動集群。

運行成功的話,可以在 http://localhost:8081 訪問到 Flink Web UI。

另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安裝

  1. 下載 Kafka 2.2.0 安裝包並解壓:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
  2. 將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是 KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
  3. 在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。
  4. 在命令行執行 jps,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

MySQL 安裝

$ docker pull mysql  $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然後在 MySQL 中創建一個 flink-test 的資料庫,並按照上文的 schema 創建 pvuv_sink 表。

提交 SQL 任務

  1. 在 flink-sql-submit 目錄下運行 ./source-generator.sh,會自動創建 user_behavior topic,並實時往裡灌入數據。
  1. 在 flink-sql-submit 目錄下運行 ./run.sh q1, 提交成功後,可以在 Web UI 中看到拓撲。

在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化

結尾

本文帶大家搭建基礎集群環境,並使用 SqlSubmit 提交純 SQL 任務來學習了解如何連接外部系統。flink-sql-submit/src/main/resources/q1.sql 中還有一些注釋掉的調優參數,感興趣的同學可以將參數打開,觀察對作業的影響。關於這些調優參數的原理,可以看下分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。