在 Kylin 中實現異常值檢測 UD(A)F
- 2019 年 12 月 25 日
- 筆記
本文講解了時間序列數據異常值檢測的基本概念和在 Kylin 中開發使用異常值檢測 UDF 的方法,可以作為其他 UDF 開發的參考。
通過在 Kylin 中移植 Hivemall 的 UDF,我們可以充分利用 Kylin 的優勢,減少直接使用 Hivemall 過程中的數據加工、存儲等繁雜步驟的工作量,提升用戶的查詢體驗。本文使用的驗證環境為 Kylin 2.6.3。
時間序列數據的異常值檢測
時間序列數據是聚合數據中的一種重要類別,數據分析人員經常需要使用各種方法從不同角度對聚合得到的時間序列數據進行挖掘,異常值檢測(Anomaly Detection)就是其中的一種常見方法。異常值檢測的主要目標是從時間序列數據中區分出與預期的正常值不符的值[1],如離群值(outlier)和突變點(change-point)等,這些值往往具有比較高的關注價值,是分析人員在進行業務分析時需要重點關注的對象。
時間序列數據的異常值檢測具有廣泛的應用場景,例如:應用在一般的商業領域中,有助於定位生產銷售中的異常波動;應用在運維中,有助於迅速發現故障;應用在醫學上,有助於醫生做出診斷,等等。
Hivemall 的異常值檢測函數
使用傳統方法在大數據集上進行異常值檢測存在效率低、不夠靈活等問題,因此就有人嘗試引入 Hive,通過對 Hive 進行擴展來解決這些問題。例如,Apache 孵化項目 Hivemall (http://hivemall.incubator.apache.org)為 Hive 提供了大量的數據分析 UDF(User-defined Function,用戶定義函數)作為擴展,其中就包括一個用於進行異常值檢測的函數 changefinder。changefinder 函數實現了 ChangeFinder 算法[2],提供一維和二維數據浮點數據的離群值和突變點檢測功能。
Hivemall 在官網中提供了一個使用示例數據(https://github.com/apache/incubator-hivemall/blob/master/core/src/test/resources/hivemall/anomaly/twitter.csv.gz?raw=true,這是一組來自 Twitter 的時間序列數據)進行異常值檢測的例子,例子中的查詢語句和查詢結果片段如下:
SELECT num, changefinder(value, "-outlier_threshold 0.03 -changepoint_threshold 0.0035") AS result FROM timeseries ORDER BY num ASC ;
num |
result |
---|---|
… |
… |
16 |
{"outlier_score":0.051287243859365894,"changepoint_score":0.003292139657059704,"is_outlier":true,"is_changepoint":false} |
17 |
{"outlier_score":0.03994335565212781,"changepoint_score":0.003484242549446824,"is_outlier":true,"is_changepoint":false} |
18 |
{"outlier_score":0.9153515196592132,"changepoint_score":0.0036439645550477373,"is_outlier":true,"is_changepoint":true} |
19 |
{"outlier_score":0.03940593403992665,"changepoint_score":0.0035825157392152134,"is_outlier":true,"is_changepoint":true} |
20 |
{"outlier_score":0.27172093630215555,"changepoint_score":0.003542822324886785,"is_outlier":true,"is_changepoint":true} |
21 |
{"outlier_score":0.006784031454620809,"changepoint_score":0.0035029441620275975,"is_outlier":false,"is_changepoint":true} |
22 |
{"outlier_score":0.011838969816513334,"changepoint_score":0.003519599336202336,"is_outlier":false,"is_changepoint":true} |
23 |
{"outlier_score":0.09609857927656007,"changepoint_score":0.003478729798944702,"is_outlier":true,"is_changepoint":false} |
24 |
{"outlier_score":0.23927000145081978,"changepoint_score":0.0034338476757061237,"is_outlier":true,"is_changepoint":false} |
25 |
{"outlier_score":0.04645945042821564,"changepoint_score":0.0034052091926036914,"is_outlier":true,"is_changepoint":false} |
… |
… |
在 Kylin 中使用 UDF
Apache Kylin 是一個基於 Hadoop 平台的 OLAP 引擎,它採用預計算的理念,預先對查詢中可能使用到的表和維度進行關聯、聚合,使得後續的查詢可以直接使用預計算的結果。預計算結果被存儲下來 ,通過消耗存儲空間換取更快速的查詢響應,即所謂的空間換時間。Hivemall 基於 Hive 大幅提升了在大數據集上進行異常值檢測的效率和靈活性,但在分析聚合數據的場景下,以其查詢效率仍然無法實現實時交互式分析,因此可以考慮通過引入 Kylin 來解決這個問題。
與 Hive 等類似,Kylin 也支持 UDF,允許用戶對查詢中需要使用的函數進行一定的擴展,下面我們就嘗試通過創建 UDF 的方式,在 Kylin 中引入 Hivemall 的一維 changefinder 函數。
創建 Kylin 的 UDF 主要有兩個步驟:
- 創建一個包含 UDF 相關方法的類,具體需要哪些方法將在後文詳細介紹。
- 修改 Kylin 目錄下的 conf/kylin.properties 配置文件,添加一行 kylin.query.udf.{UDF名稱}={UDF類名},保存並重啟 Kylin 後就可以使用定義好的 UDF 了。
Hivamall 中的 ChangeFinder 算法
在動手之前,我們先簡要考察一下 ChangeFinder 算法的原理和它在 Hivemall 中的實現。ChangeFinder 是一種基於自回歸模型的算法,包括分別檢測離群值和突變點的兩大階段,每個階段又可以細分為更新自回歸模型和計算異常分值兩個步驟。這其中兩個階段的計算過程是幾乎相同的,輸出分別是離群分值和突變分值,主要區別在於輸入不同,突變點檢測階段以離群值檢測階段的輸出作為輸入。
ChangeFinder 算法定義了時間序列上的自回歸模型,並設計了用於估計模型參數的算法,稱為 Sequentially Discounting AR Model Learning 算法,簡稱 SDAR 算法。SDAR 算法有兩個參數,分別是模型的階數和衰減係數,序列數據保存在長度為的 RingBuffer 中,經 SDAR 算法計算就可以得到該序列對應自回歸模型的參數,這部分在一維數據上的實現在 hivemall.anomaly.SDAR1D 類中。
有了這些參數,就可以使用模型的概率密度函數進行分值的計算,Hivemall 中提供了海林格距離和對數兩種計算方式,其中默認使用的是海林格距離。此外,ChangeFinder 算法在突變點檢測階段的開始和結束時會分別對輸入的離群分值和輸出的突變分值進行窗口平滑,這就需要額外的兩個輸入參數,即窗口大小T1和 T2。
static double evalScoreY(DoubleRingBuffer xRing, SDAR1D sdar1, DoubleRingBuffer yRing, SDAR1D sdar2, DoubleRingBuffer outlierScores, DoubleRingBuffer changepointScores, double x, int k) { //計算離群分值 double scoreX = evalScoreX(xRing, sdar1, x, k); //第一次平滑 double y = smoothing(outlierScores.add(scoreX)); //計算突變分值 double[] ySeries = new double[k + 1]; yRing.add(y).toArray(ySeries, false); int k2 = yRing.size() - 1; double y_hat = sdar2.update(ySeries, k2); double lossY = (k2 == 0D) ? 0D : hellingerLoss(sdar2); //第二次平滑 double scoreY = smoothing(changepointScores.add(lossY)); return scoreY; }
UDF 開發過程中的主要問題
了解了上面這些信息,就可以開始動手移植了,動手過程中需要解決幾個問題:
首先是 UDF 的類型問題。Kylin 的 SQL 支持來自另一個 Apache 開源項目 Calcite(http://calcite.apache.org),Calcite支持的 UDF 除最基本的單行映射的函數外,還有聚合函數和窗口聚合函數兩種,即 UDAF 和 Window UDAF。
不同類型的 UDF,其類定義和在 SQL 中的使用方法也不盡相同,聚合函數將整列數據聚合成一個,如 count、sum 等,窗口聚合函數則是與 over 子句一起使用的聚合函數。從上面考察的 changefinder 算法的原理來看,這裡我們應該選擇窗口聚合函數,這樣就需要在類中定義 5 個方法:init、add、merge、remove result (http://calcite.apache.org/docs/adapter.html,Extensibility 小節),並定義一個類,用於儲存異常分值計算的中間結果。
其次,要解決函數拆分的問題。Hivemall 中的 changefinder 函數將離群值和突變點的檢測放在了一起,以 json 的形式輸出計算結果,我們可以將它拆分成兩個 UDF,即定義兩個類 OutlierWindowUDF 和 ChangePointWindowUDF,分別用於計算離群分值和突變分值,以方便進一步的分析。以 changepoint 函數為例,拆分後的主要計算過程如下:
static double evalScoreY(DoubleRingBuffer xRing, SDAR1D sdar1, DoubleRingBuffer yRing, SDAR1D sdar2, DoubleRingBuffer outlierScores, DoubleRingBuffer changepointScores, double x, int k) { //計算離群分值 double scoreX = evalScoreX(xRing, sdar1, x, k); //第一次平滑 double y = smoothing(outlierScores.add(scoreX)); //計算突變分值 double[] ySeries = new double[k + 1]; yRing.add(y).toArray(ySeries, false); int k2 = yRing.size() - 1; double y_hat = sdar2.update(ySeries, k2); double lossY = (k2 == 0D) ? 0D : hellingerLoss(sdar2); //第二次平滑 double scoreY = smoothing(changepointScores.add(lossY)); return scoreY;
接下來要解決數據結構初始化的問題。Hive 允許 UDF 在進行計算之前使用傳入的參數進行初始化,但 Calcite 的 UDF 並不支持帶參數的初始化,因此需要在計算第一行時使用、等參數進行初始化,OutlierWindowUDF 需要初始化 1 個 DoubleRingBuffer 變量和 1 個 SDAR1D 變量,而 ChangePointWindowUDF 需要初始化 4 個 DoubleRingBuffer 變量和 2 個 SDAR1D 變量。Calcite 聚合函數類型的 UDF 需要使用一個用戶定義的類作為 Accumulator,因此我們可以定義一個內部類,用於存儲計算過程中所需的各種變量,以 outlier 函數為例:
class OutlierWindowStatus { DoubleRingBuffer xRing; SDAR1D sdar1; double result; boolean initialized; OutlierWindowStatus() { xRing = null; sdar1 = null; result = 0D; initialized = false; } void initialize(int k, double r) { xRing = new DoubleRingBuffer(k + 1); sdar1 = new SDAR1D(r, k); initialized = true; } void setResult(double result) { this.result = result; } } public OutlierWindowStatus init() { return new OutlierWindowStatus(); } public OutlierWindowStatus add(OutlierWindowStatus ows, BigDecimal x, int k, BigDecimal r1) { if (!ows.initialized) { ows.initialize(k, r1.doubleValue()); } ows.setResult(evalScoreX(ows.xRing, ows.sdar1, x.doubleValue(), k)); return ows; } public OutlierWindowStatus merge(OutlierWindowStatus ows1, OutlierWindowStatus ows2) { return ows1; } public OutlierWindowStatus remove(OutlierWindowStatus ows, double x) { return ows; } public double result(OutlierWindowStatus ows) { return ows.result; }
還要注意的是,由於 Calcite 會把窗口聚合函數輸入的非整數作為 BigDecimal 類型來處理,這裡也應該把相應的參數定義為 BigDecimal 類型。
最後,使用 maven 的 shade 插件進行打包,並排除其中的 SF、DSA、RSA 等簽名文件,就得到了可以放入 Kylin 的 lib 目錄的 jar 文件,在 kylin.properties 文件中加入以下兩行並重啟 Kylin,就可以使用 UDF 了:
kylin.query.udf.outlier_window=org.apache.kylin.query.udf.OutlierWindowUDF kylin.query.udf.changepoint_window=org.apache.kylin.query.udf.ChangePointWindowUDF
我們可以將 Hivemall 提供的示例數據導入到 Hive 中,使用 Kylin 建立 Cube 並進行簡單的查詢驗證,以 outlier 函數為例:

如上圖,我們使用一條簡單的 SQL 語句驗證 outlier 函數,返回的結果就是在示例數據上計算得到的離群分值。為了使結果更加直觀一些,可以將查詢結果導出,從中截取一個 100 行的片段繪圖,在圖中加入一條表示閾值的虛線:

圖中的橫軸表示數據的行號,縱軸表示數據的離群分值,取閾值為 0.03,可以看到,函數成功檢測出了這 100 條數據中的 3 個比較明顯的離群值。
作者介紹:鄭嶸,Kyligence 研發工程師,主要負責 Kyligence MDX 研發。
參考文獻
[1] Chandola, Varun, Arindam Banerjee, and Vipin Kumar. Anomaly detection: A survey. ACM computing surveys (CSUR) 41.3 (2009): 15.
[2] K. Yamanishi and J. Takeuchi. A Unifying Framework for Detecting Outliers and Change Points from Non-Stationary Time Series Data. KDD 2002.