第05講:Flink SQL & Table 編程和案例

Flink系列文章

  1. 第01講:Flink 的應用場景和架構模型
  2. 第02講:Flink 入門程式 WordCount 和 SQL 實現
  3. 第03講:Flink 的編程模型與其他框架比較
  4. 第04講:Flink 常用的 DataSet 和 DataStream API
  5. 第05講:Flink SQL & Table 編程和案例
  6. 第06講:Flink 集群安裝部署和 HA 配置
  7. 第07講:Flink 常見核心概念分析
  8. 第08講:Flink 窗口、時間和水印
  9. 第09講:Flink 狀態與容錯

我們在第 02 課時中使用 Flink Table & SQL 的 API 實現了最簡單的 WordCount 程式。在這一課時中,將分別從 Flink Table & SQL 的背景和編程模型、常見的 API、運算元和內置函數等對 Flink Table & SQL 做一個詳細的講解和概括,最後模擬了一個實際業務場景使用 Flink Table & SQL 開發。

背景

我們在前面的課時中講過 Flink 的分層模型,Flink 自身提供了不同級別的抽象來支援我們開發流式或者批量處理程式,下圖描述了 Flink 支援的 4 種不同級別的抽象。

image.png

Table APISQL 處於最頂端,是 Flink 提供的高級 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。

我們在第 04 課時中提到過,Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套程式碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。

原理

你之前可能都了解過 Hive,在離線計算場景下 Hive 幾乎扛起了離線數據處理的半壁江山。它的底層對 SQL 的解析用到了 Apache Calcite,Flink 同樣把 SQL 的解析、優化和執行交給了 Calcite。

下圖是一張經典的 Flink Table & SQL 實現原理圖,可以看到 Calcite 在整個架構中處於絕對核心地位。

image (1).png
從圖中可以看到無論是批查詢 SQL 還是流式查詢 SQL,都會經過對應的轉換器 Parser 轉換成為節點樹 SQLNode tree,然後生成邏輯執行計劃 Logical Plan,邏輯執行計劃在經過優化後生成真正可以執行的物理執行計劃,交給 DataSet 或者 DataStream 的 API 去執行。

在這裡我們不對 Calcite 的原理過度展開,有興趣的可以直接在官網上學習。一個完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 構成:

image (2).png

  • Source 部分來源於外部數據源,我們經常用的有 Kafka、MySQL 等;
  • Transformation 部分則是 Flink Table & SQL 支援的常用 SQL 運算元,比如簡單的 Select、Groupby 等,當然在這裡也有更為複雜的多流 Join、流與維表的 Join 等;
  • Sink 部分是指的結果存儲比如 MySQL、HBase 或 Kakfa 等。

動態表

與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流數據時會時時刻刻處於動態的數據變化中,所以便有了一個動態表的概念。動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。

我們舉個簡單的例子,Flink 程式接受一個 Kafka 流作為輸入,Kafka 中為用戶的購買記錄:

image (3).png

首先,Kafka 的消息會被源源不斷的解析成一張不斷增長的動態表,我們在動態表上執行的 SQL 會不斷生成新的動態表作為結果表。

我們在講解 Flink Table & SQL 所支援的常用運算元前,需要說明一點,Flink 自從 0.9 版本開始支援 Table & SQL 功能一直處於完善開發中,且在不斷進行迭代。我們在官網中也可以看到這樣的提示:

Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.

Flink Table & SQL 的開發一直在進行中,並沒有支援所有場景下的計算邏輯。從我個人實踐角度來講,在使用原生的 Flink Table & SQL 時,務必查詢官網當前版本對 Table & SQL 的支援程度,盡量選擇場景明確,邏輯不是極其複雜的場景。

常用運算元

目前 Flink SQL 支援的語法主要如下:

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'
...

可以看到 Flink SQL 和傳統的 SQL 一樣,支援了包含查詢、連接、聚合等場景,另外還支援了包括窗口、排序等場景。下面我就以最常用的運算元來做詳細的講解。

SELECT/AS/WHERE

SELECT、WHERE 和傳統 SQL 用法一樣,用於篩選和過濾數據,同時適用於 DataStream 和 DataSet。

SELECT * FROM Table;
SELECT name,age FROM Table;

當然我們也可以在 WHERE 條件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表達式的組合:

SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

GROUP BY / DISTINCT/HAVING

GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。HAVING 和傳統 SQL 一樣,可以用來在聚合函數之後進行篩選。

SELECT DISTINCT name FROM Table;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;

JOIN

JOIN 可以用於把來自兩個表的數據聯合起來形成結果表,目前 Flink 的 Join 只支援等值連接。Flink 支援的 JOIN 類型包括:

JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN

例如,用用戶表和商品表進行關聯:

SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer

LEFT JOIN、RIGHT JOIN 、FULL JOIN 相與我們傳統 SQL 中含義一樣。

WINDOW

根據窗口數據劃分的不同,目前 Apache Flink 有如下 3 種:

  • 滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
  • 滑動窗口,窗口數據有固定大小,並且有生成間隔;
  • 會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行劃分,窗口數據無疊加;

滾動窗口

滾動窗口的特點是:有固定大小、窗口中的數據不會重疊,如下圖所示:
image (4).png

滾動窗口的語法:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

舉例說明,我們需要計算每個用戶每天的訂單數量:

SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的開始時間和窗口的結束時間,TUMBLE (timeLine, INTERVAL ‘1’ DAY) 中的 timeLine 代表時間欄位所在的列,INTERVAL ‘1’ DAY 表示時間間隔為一天。

滑動窗口

滑動窗口有固定的大小,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的創建頻率。需要注意的是,多個滑動窗口可能會發生數據重疊,具體語義如下:

image (5).png

滑動窗口的語法與滾動窗口相比,只多了一個 slide 參數:

複製程式碼

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:

複製程式碼

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL ‘1’ HOUR 代表滑動窗口生成的時間間隔。

會話窗口

會話窗口定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或消息,則會話窗口關閉。

image (6).png
會話窗口的語法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)

舉例,我們需要計算每個用戶過去 1 小時內的訂單量:

SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

內置函數

Flink 中還有大量的內置函數,我們可以直接使用,將內置函數分類如下:

  • 比較函數
  • 邏輯函數
  • 算術函數
  • 字元串處理函數
  • 時間函數

比較函數
比較函數.png

邏輯函數
邏輯函數.png

算術函數
算術函數.png

字元串處理函數
字元串處理函數.png

時間函數
時間函數.png

上面分別介紹了 Flink Table & SQL 的原理和支援的運算元,我們模擬一個實時的數據流,然後講解 SQL JOIN 的用法。

在上一課時中,我們利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源,具體實現如下:

複製程式碼

public  class MyStreamingSource implements SourceFunction<Item> {
    private boolean isRunning = true;
    /**
     * 重寫run方法產生一個源源不斷的數據發送源
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while (isRunning) {
            Item item = generateItem();
            ctx.collect(item);

            //每秒產生一條數據
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    //隨機產生一條商品數據
    private Item generateItem() {
        int i = new Random().nextInt(100);
        ArrayList<String> list = new ArrayList();
        list.add("HAT");
        list.add("TIE");
        list.add("SHOE");
        Item item = new Item();
        item.setName(list.get(new Random().nextInt(3)));
        item.setId(i);
        return item;
    }
}

我們把實時的商品數據流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同,最後,把兩個流的 JOIN 結果輸出。

public class JoinDemo extends StreamJavaJob {
    public static void main(String[] args) throws Exception {
        initStreamJob(null, true);

        SingleOutputStreamOperator<Item> source = env.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
            @Override
            public Item map(Item item) throws Exception {
                return item;
            }
        });

        DataStream<Item> evenSelect = source.split(new OutputSelector<Item>() {
            @Override
            public Iterable<String> select(Item value) {
                List<String> output = new ArrayList<>();
                if (value.getId() % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        }).select("even");

        DataStream<Item> oddSelect = source.split(new OutputSelector<Item>() {
            @Override
            public Iterable<String> select(Item value) {
                List<String> output = new ArrayList<>();
                if (value.getId() % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        }).select("odd");


        stEnv.createTemporaryView("evenTable", evenSelect, "name,id");
        stEnv.createTemporaryView("oddTable", oddSelect, "name,id");

        Table queryTable = stEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");

        queryTable.printSchema();

        stEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();
        startStreaming();
    }
}

直接右鍵運行,在控制台可以看到輸出:

image (7).png

總結

我們在這一課時中講解了 Flink Table & SQL 的背景和原理,並且講解了動態表的概念;同時對 Flink 支援的常用 SQL 和內置函數進行了講解;最後用一個案例,講解了整個 Flink Table & SQL 的使用。

關注公眾號:大數據技術派,回復資料,領取1024G資料。

Tags: