第05講:Flink SQL & Table 編程和案例
Flink系列文章
- 第01講:Flink 的應用場景和架構模型
- 第02講:Flink 入門程式 WordCount 和 SQL 實現
- 第03講:Flink 的編程模型與其他框架比較
- 第04講:Flink 常用的 DataSet 和 DataStream API
- 第05講:Flink SQL & Table 編程和案例
- 第06講:Flink 集群安裝部署和 HA 配置
- 第07講:Flink 常見核心概念分析
- 第08講:Flink 窗口、時間和水印
- 第09講:Flink 狀態與容錯
我們在第 02 課時中使用 Flink Table & SQL 的 API 實現了最簡單的 WordCount 程式。在這一課時中,將分別從 Flink Table & SQL 的背景和編程模型、常見的 API、運算元和內置函數等對 Flink Table & SQL 做一個詳細的講解和概括,最後模擬了一個實際業務場景使用 Flink Table & SQL 開發。
Flink Table & SQL 概述
背景
我們在前面的課時中講過 Flink 的分層模型,Flink 自身提供了不同級別的抽象來支援我們開發流式或者批量處理程式,下圖描述了 Flink 支援的 4 種不同級別的抽象。
Table API 和 SQL 處於最頂端,是 Flink 提供的高級 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。
我們在第 04 課時中提到過,Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套程式碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。
原理
你之前可能都了解過 Hive,在離線計算場景下 Hive 幾乎扛起了離線數據處理的半壁江山。它的底層對 SQL 的解析用到了 Apache Calcite,Flink 同樣把 SQL 的解析、優化和執行交給了 Calcite。
下圖是一張經典的 Flink Table & SQL 實現原理圖,可以看到 Calcite 在整個架構中處於絕對核心地位。
從圖中可以看到無論是批查詢 SQL 還是流式查詢 SQL,都會經過對應的轉換器 Parser 轉換成為節點樹 SQLNode tree,然後生成邏輯執行計劃 Logical Plan,邏輯執行計劃在經過優化後生成真正可以執行的物理執行計劃,交給 DataSet 或者 DataStream 的 API 去執行。
在這裡我們不對 Calcite 的原理過度展開,有興趣的可以直接在官網上學習。一個完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 構成:
- Source 部分來源於外部數據源,我們經常用的有 Kafka、MySQL 等;
- Transformation 部分則是 Flink Table & SQL 支援的常用 SQL 運算元,比如簡單的 Select、Groupby 等,當然在這裡也有更為複雜的多流 Join、流與維表的 Join 等;
- Sink 部分是指的結果存儲比如 MySQL、HBase 或 Kakfa 等。
動態表
與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流數據時會時時刻刻處於動態的數據變化中,所以便有了一個動態表的概念。動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。
我們舉個簡單的例子,Flink 程式接受一個 Kafka 流作為輸入,Kafka 中為用戶的購買記錄:
首先,Kafka 的消息會被源源不斷的解析成一張不斷增長的動態表,我們在動態表上執行的 SQL 會不斷生成新的動態表作為結果表。
Flink Table & SQL 運算元和內置函數
我們在講解 Flink Table & SQL 所支援的常用運算元前,需要說明一點,Flink 自從 0.9 版本開始支援 Table & SQL 功能一直處於完善開發中,且在不斷進行迭代。我們在官網中也可以看到這樣的提示:
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Flink Table & SQL 的開發一直在進行中,並沒有支援所有場景下的計算邏輯。從我個人實踐角度來講,在使用原生的 Flink Table & SQL 時,務必查詢官網當前版本對 Table & SQL 的支援程度,盡量選擇場景明確,邏輯不是極其複雜的場景。
常用運算元
目前 Flink SQL 支援的語法主要如下:
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
...
可以看到 Flink SQL 和傳統的 SQL 一樣,支援了包含查詢、連接、聚合等場景,另外還支援了包括窗口、排序等場景。下面我就以最常用的運算元來做詳細的講解。
SELECT/AS/WHERE
SELECT、WHERE 和傳統 SQL 用法一樣,用於篩選和過濾數據,同時適用於 DataStream 和 DataSet。
SELECT * FROM Table;
SELECT name,age FROM Table;
當然我們也可以在 WHERE 條件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表達式的組合:
SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
GROUP BY / DISTINCT/HAVING
GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。HAVING 和傳統 SQL 一樣,可以用來在聚合函數之後進行篩選。
SELECT DISTINCT name FROM Table;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;
JOIN
JOIN 可以用於把來自兩個表的數據聯合起來形成結果表,目前 Flink 的 Join 只支援等值連接。Flink 支援的 JOIN 類型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
例如,用用戶表和商品表進行關聯:
SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer
LEFT JOIN、RIGHT JOIN 、FULL JOIN 相與我們傳統 SQL 中含義一樣。
WINDOW
根據窗口數據劃分的不同,目前 Apache Flink 有如下 3 種:
- 滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
- 滑動窗口,窗口數據有固定大小,並且有生成間隔;
- 會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行劃分,窗口數據無疊加;
滾動窗口
滾動窗口的特點是:有固定大小、窗口中的數據不會重疊,如下圖所示:
滾動窗口的語法:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
舉例說明,我們需要計算每個用戶每天的訂單數量:
SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;
其中,TUMBLE_START 和 TUMBLE_END 代表窗口的開始時間和窗口的結束時間,TUMBLE (timeLine, INTERVAL ‘1’ DAY) 中的 timeLine 代表時間欄位所在的列,INTERVAL ‘1’ DAY 表示時間間隔為一天。
滑動窗口
滑動窗口有固定的大小,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的創建頻率。需要注意的是,多個滑動窗口可能會發生數據重疊,具體語義如下:
滑動窗口的語法與滾動窗口相比,只多了一個 slide 參數:
複製程式碼
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:
複製程式碼
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
上述案例中的 INTERVAL ‘1’ HOUR 代表滑動窗口生成的時間間隔。
會話窗口
會話窗口定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或消息,則會話窗口關閉。
會話窗口的語法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
舉例,我們需要計算每個用戶過去 1 小時內的訂單量:
SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user
內置函數
Flink 中還有大量的內置函數,我們可以直接使用,將內置函數分類如下:
- 比較函數
- 邏輯函數
- 算術函數
- 字元串處理函數
- 時間函數
比較函數
邏輯函數
算術函數
字元串處理函數
時間函數
Flink Table & SQL 案例
上面分別介紹了 Flink Table & SQL 的原理和支援的運算元,我們模擬一個實時的數據流,然後講解 SQL JOIN 的用法。
在上一課時中,我們利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源,具體實現如下:
複製程式碼
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
/**
* 重寫run方法產生一個源源不斷的數據發送源
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Item> ctx) throws Exception {
while (isRunning) {
Item item = generateItem();
ctx.collect(item);
//每秒產生一條數據
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
//隨機產生一條商品數據
private Item generateItem() {
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
我們把實時的商品數據流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同,最後,把兩個流的 JOIN 結果輸出。
public class JoinDemo extends StreamJavaJob {
public static void main(String[] args) throws Exception {
initStreamJob(null, true);
SingleOutputStreamOperator<Item> source = env.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
@Override
public Item map(Item item) throws Exception {
return item;
}
});
DataStream<Item> evenSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("even");
DataStream<Item> oddSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("odd");
stEnv.createTemporaryView("evenTable", evenSelect, "name,id");
stEnv.createTemporaryView("oddTable", oddSelect, "name,id");
Table queryTable = stEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
queryTable.printSchema();
stEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();
startStreaming();
}
}
直接右鍵運行,在控制台可以看到輸出:
總結
我們在這一課時中講解了 Flink Table & SQL 的背景和原理,並且講解了動態表的概念;同時對 Flink 支援的常用 SQL 和內置函數進行了講解;最後用一個案例,講解了整個 Flink Table & SQL 的使用。
關注公眾號:
大數據技術派
,回復資料
,領取1024G
資料。