Flink SQL 核心概念剖析與編程案例實戰
本次,我們從 0 開始逐步剖析 Flink SQL 的來龍去脈以及核心概念,並附帶完整的示常式序,希望對大家有幫助!
本文大綱
一、快速體驗 Flink SQL
為了快速搭建環境體驗 Flink SQL,我們使用 Docker 來安裝一些基礎組件,包括 zk 和 kafka,如果你有這個環境,可以略過了。
在 Centos 7 上安裝 Docker 環境,具體見這個鏈接,此處就不細說了:
//blog.csdn.net/qq_24434251/article/details/105712044
1、拉取安裝並執行 zookeeper 鏡像
docker pull debezium/zookeeper
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
2、拉取安裝並執行 kafka 鏡像
docker pull debezium/kafka
docker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --link zookeeper:zookeeper debezium/kafka
3、進入 kafka 容器內的命令行
docker exec -it kafka /bin/bash
4、創建一個 topic
/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.10:2181 --topic user_log --partitions 1 --replication-factor 1
5、在 IDEA 中啟動程式
這裡不貼程式碼太長了,具體程式可以參見我的 github:
//github.com/nicekk/Flink-Practice
6、寫入數據
/kafka/bin/kafka-console-producer.sh --broker-list 192.168.56.10:9092 --topic user_log
數據樣例:
{"user_id":123,"item_id":345,"ts":"2021-01-05 23:04:00"}
{"user_id":345,"item_id":345,"ts":"2021-01-05 23:04:00"}
7、結果輸出:
二、數據類型系統
繼續說明 Flink SQL 使用之前,我們還需要談一談 Flink 的數據類型系統。
Flink 作為一款高性能的計算框架,必然繞不開分散式計算、數據傳輸和持久化這些問題。
在數據傳輸過程中,要對數據進行序列化和反序列化:序列化就是將一個記憶體對象轉換成二進位串,形成網路傳輸或者持久化的數據流;反序列化將二進位串轉換為記憶體對象,這樣就可以直接在程式語言中讀寫這個對象。
Flink 是運行在 JVM 上的,計算過程中會有大量的數據存儲在記憶體中,這就會面臨一些問題,如 Java 對象存儲密度較低等。
針對這些問題,最常用的方案就是自己實現一個顯示的記憶體管理,用自定義的記憶體池來進行記憶體的分配回收,接著將序列化後的對象存儲到記憶體塊中。
所以,Flink 對數據類型推斷越準確,越能更早的完成數據類型檢查,幫助 Flink 更好的規劃記憶體,節省存儲空間。
比如下面這個類,Tuple3 <Integer,Double,Person> ,包含三種數據類型。
其中 Person 包含兩個欄位,分別是 id 和 name。
如圖,int 佔四個位元組,通過 IntSerializer 序列化操作之後,給它分配 4 個位元組就行了。對象之間可以緊湊的在一起存儲,不像 Java 的序列化會有更多的存儲損耗。
(數據類型系統,是 Flink 一個非常大的領域,我們會單開一篇文章來詳細說明,此處只想說明一下數據類型的重要作用)
三、在無界數據流上怎麼執行 SQL
在有界的數據集上執行 SQL ,相信大家每天都深有體會,每天都會做。有界的數據集是靜止的,離線模式下,SQL 可以訪問完整的數據集,查詢產生結果後就終止了。
而數據流是無限的,意味著程式需要一直運行,等待數據進入並進行處理,這樣的一種模式如何和 SQL 關聯起來呢?
這裡我們要引入兩個概念:動態表(Dynamic Table)和持續查詢(Continuous Queries )。
(1)動態表
如果想用 SQL 去分析一個數據流,那第一件事就是要把流轉換成表。
如下圖,左邊是一個點擊的事件流,有姓名,事件時間,點擊的 url 資訊。右邊是一張表,也有這三個欄位。
從左邊的流到右邊的表,是一個邏輯上的映射過程,並沒有將數據持久化。
隨著左邊流事件源源不斷的到來,右邊的表的記錄也會一直追加更新。
這樣一直變化的表,就稱為「動態表」。
(2)連續查詢
對於動態表的查詢就被稱為是連續查詢。
如下圖,將下面的 SQL 作用在動態表上,就產生了一個持續查詢,因為這個查詢一直不會終止掉,並且每個事件到來時,都會產生一次查詢。
查詢的結果,會生成一個新的動態表。
select
user,
count(url) as cnt
from clicks
group by user;
Mary,./home)這條數據到來,產生查詢的結果:【Mary,1】
(Bob,./cart) 這條數據到來,會在動態表上追加一條 Bob 的記錄,最終的結果為:【Mary,1】【Bob,1】
(Mary,./prod?id=1) 這條數據到來,會更新動態表的 Mary 的記錄,最終結果為:【Mary,2】【Bob,1】
(Liz,./home) 這條數據到來,會在動態表上追加一個記錄,最終結果為:【Mary,2】【Bob,1】【Liz,1】
這樣的話,我們就可以使用 SQL 在動態表上連續查詢,產生新的動態表。(實際上,在上一篇中,我們已經知道,SQL 最終是會變成程式執行的)。
(3)查詢限制
由於流是無限的,我們不得不思考一個問題,那就是所有的查詢語句都能在流上執行嗎?
答案是否定的,主要是兩點原因,一是維護的狀態比較大,二是計算更新的成本高。
由於連續查詢會一直運行,為了更新之前產生的結果,需要維護所有的輸出行,這樣的話,記憶體中存儲的數據會越來越大。
然後有時候,即使只來了一條記錄,也需要重新計算和更新之前大部分的結果行,這樣的查詢也不適合作為連續查詢。
比如下面的 SQL,求排名,每次來數據之後,都需要計算大量數據的排名:
SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user,
MAX(cTime) AS lastAction
FROM clicks GROUP BY user
);
(4)結果輸出
最後一個問題,Flink 是一個計算引擎,自身不存儲數據,那麼它是如何表示更新數據並更新到外部存儲?
這裡我們舉兩個例子來說明
1、目標表是控制台
我們可以回到上面的那個例子,例子中,由於目標是控制台,可以任意列印結果。
-- 源表,連接 kafka,從最新的地方開始消費
CREATE TABLE user_log (
user_id bigint,
item_id bigint,
ts TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'user_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '192.168.56.10:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
)
-- 目標表是控制台,直接列印
CREATE TABLE user_log_result(
user_id bigint,
cnt bigint
) WITH (
'connector' = 'print'
)
-- 查詢的 SQL,一個簡單的 group by ,統計源表的 user_id 數量,寫到目標表
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
當我們第一次輸入一條數據時: {“user_id”:123,”item_id”:345,”ts”:”2021-01-05 23:04:00″}
控制台上列印:
3> +I(123,1)
當我們再次輸入一條數據時:{“user_id”:123,”item_id”:123,”ts”:”2021-01-05 23:04:00″}
控制台上列印了兩條數據:
3> -U(123,1)
3> +U(123,2)
+I,-U,+U 表示一行數據的 changelog,+I 表示是新增的數據,-U 表示之前的記錄已經被更新,之前的記錄要回撤,+U 表示本次更新的數據。
可以看到,輸出結果是以對於每行產生 changelog 的形式來表示的。
如果 sink 階段要使用 DataStream Api,可以把動態表變成流,繼續 sink 到下游節點。如果使用 SQL,則直接可以發送到下游。
具體程式見:
2、目標表是 Kafka 的時候
-- 源表,連接 kafka,從最新的地方開始消費
CREATE TABLE user_log (
user_id bigint,
item_id bigint,
ts TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'user_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '192.168.56.10:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
)
-- 目標表是 Kafka
CREATE TABLE user_log_result (
user_id bigint,
cnt bigint
) WITH (
'connector' = 'kafka',
'topic' = 'user_log_result',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '192.168.56.10:9092',
'format' = 'json'
)
-- 查詢的 SQL,一個簡單的 group by ,統計源表的 user_id 數量,寫到目標表
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
此時再運行,直接就報錯了,提示資訊如下:
Exception in thread "main" org.apache.flink.table.api.TableException:
Table sink 'default_catalog.default_database.user_log_result' doesn't support consuming update changes
which is produced by node GroupAggregate(groupBy=[user_id], select=[user_id, COUNT(*) AS cnt])
大意是:這是一個 Group 的聚合,而目標表 user_log_result (kafka)不支援更新的數據。kafka 只能支援一直新增的數據。
如果我們換成下面的 SQL,數據只有新增不會更新,就可以運行了。當然也可以把目標表換成其他可以更新的介質,如 mysql ,hbase 等等。
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id
具體程式見:
四、時間、INTERVAL 與 窗口計算
窗口計算永遠是流計算的核心,窗口將無限流切分為有限大小的數據集,可以對這個有限數據集進行計算。
在談到窗口的時候,總是會情不自禁冒出 N 多的概念,比如:事件時間,處理時間,窗口開始時間,窗口結束時間,滑動窗口,滾動窗口,窗口大小,水印 …….
在最新的 Flink SQL 中,已經可以在 DDL 中定義所有的這一切了,讓我們各個擊破他們。
1. INTERVAL
Interval 這個東西,並不是 Flink SQL 中特有的,在 ANSI SQL 中就有,下面我們以 Oracle 舉例來說明。
首先得有 Oracle 環境,這裡我們使用 Docker 來搭建,具體教程見這個鏈接:
//blog.csdn.net/qq_24434251/article/details/112341197
INTERVAL 表示一段時間差,直接建表體驗一下
create table INTERVAL_TAB
(
DURATION INTERVAL DAY (2) TO SECOND (5)
)
表示建一個表,欄位 duration 表示 天 到 秒,括弧的數字表示精度。
insert into interval_tab (duration) values (interval '3 12:32' day(3) to minute );
插入的這條數據表示一段時間:3天12小時32分鐘
可能感覺這個沒啥用,比如我問你在公司入職幾年了,你可以輕鬆說出來,但是如果我問你在公司入職多少天了,這就很複雜了,中間的閏年,2 月都要考慮,有了這樣的表示方法就很方便了。
比如可以很輕易的算出今天之前100天,是哪一天:
select sysdate,sysdate - interval '100' day(3) as "當前時間-100天" from dual;
有了 INTERVAL ,我們就可以輕鬆表示窗口的時間長短了。
2. 窗口計算
滾動窗口 – 使用ProcessingTime
-- 源表,user_name 用戶名,data 數據
CREATE TABLE user_actions (
user_name string,
data string,
user_action_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '192.168.56.10:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
)
-- 結果表
CREATE TABLE user_action_result(
window_start TIMESTAMP(3),
cnt bigint
) WITH (
'connector' = 'print'
)
-- 窗口計算
INSERT INTO user_action_result
select * from (
SELECT TUMBLE_START(user_action_time, INTERVAL '10' SECOND) window_start, COUNT(DISTINCT user_name) cnt
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' SECOND)
)
-- 測試數據
{"user_name":"zhangsan","data":"browse"}
{"user_name":"lisi","data":"browse"}
首先源表上,我們使用了 processing time,載入了欄位 user_action_time 上,這並不是我們數據中的欄位,而是程式自動給我們加上的,是一個虛擬欄位作為時間屬性。
然後是查詢 SQL, group by 後面的 TUMBLE(user_action_time, INTERVAL ’10’ SECOND),表示這是一個滾動窗口,使用 user_action_time 作為時間欄位,並且窗口大小為 INTERVAL ’10’ SECOND ,表示 10 s,就是剛剛講到的 INTERVAL 的語法。
select 中的 TUMBLE_START(user_action_time, INTERVAL ’10’ SECOND) 是窗口的開始時間,COUNT(DISTINCT user_name) 表示統計每個窗口中的 user_name 去重值。
具體程式見:
滾動窗口 – 使用 EventTime
首先仍然需要在執行環境中聲明使用 EventTime:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
修改一下源表的定義
CREATE TABLE user_actions (
user_name string,
data string,
user_action_time TIMESTAMP(3),
WATERMARK FOR user_action_time as user_action_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_log',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '192.168.56.10:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
)
可以看到,有一個時間欄位是 user_action_time,然後 使用 WATERMARK FOR user_action_time as user_action_time – INTERVAL ‘5’ SECOND ,來表示把 user_action_time 作為時間欄位,並且聲明一個 5s 延遲的 watermark。只用一句 SQL 就定義好了 event_time 和 水位。
具體程式可以去我的 github 上下載:
//github.com/nicekk/Flink-Practice
五、總結
假設你之前沒有接觸過 Flink SQL,看完本文相信你已經對 Flink SQL 有了初步的認識,再打開 IDEA,親自動手操作一遍就會有更加深刻的認識,這也就達到了本文的目的了。
如果覺得有收穫,可以關注我的公眾號:「KK架構師」