nifi從入門到實戰(保姆級教程)——flow
- 2022 年 7 月 3 日
- 筆記
- Apache Nifi, nifi
本文章首發於博客園,轉載請標明出處
經過前兩篇文章(環境篇,身份驗證),我們已經有了nifi可以運行的基礎,今天就來實現一個案例吧。
假設我們要從ftp上獲取一個zip包,裏面有兩個csv文件,一個是manufacture.csv,一個是brand.csv.然後要把這兩個文件導入到sqlserver數據庫中。其中brand是manufacture的下一級,但是brand里沒有manufacture的主鍵,必須要通過一些關鍵字段的匹配來找出它們。
在實現這個場景之前,我們需要認識一下nifi中的幾個重要組件。
Processor : 主要用來處理flowfile,也就是我們的數據。nifi提供了上百個不同功能的processor,一般的需求都能滿足。當然它也支持自定義processor,需要用java自行開發。
Processor Group :簡單地理解就是把processor的流程組合成一個整體。只有Processor Group有version,所以它對於後續流程的遷移很重要。
Input Port,Output Port : 這兩個主要是用於聯接group.
有這些了解後就開始吧!
先看看流程的整體吧
- 首先拖拽一個group在畫布中,並為這個group命名為Import,如下圖
雙擊group進入。再建一個group,命名為getfiles.這個group主要負責從ftp上獲取文件,並解壓。
GetFTP:主要填以下幾個屬性。
Delete Origianl默認為true,會刪除ftp上的文件,所以最好設置為false.類似的Processor還有getfile,使用時一定要注意。
因為我們獲取的是一個zip包,所以需要解壓。這個比較簡單,默認就行了。如果壓縮文件有密碼,設置一下password屬性就好了。
接下來就有點複雜了。因為我們的manufacture和brand是要進不同的表,所以就要路由了。這裡就要用到route的processor,我用的是RouteText,也可以用RouteOnAttribute,只是一些設置不同。後面我也會用到。
添加了兩個路由屬性:fabricantes,modelos.這個名字你可以隨便取。如果filename包含manufacture就走fabricantes分支,包含brand就走modelos分支。
後面我做了一個延時,大家可以根據實際情況自由選擇。這裡我也介紹一下。
先用UpdateAttribute添加一個屬性delay,值為當前時間加20s.
再用RouteOnAttribute來在規定時間內死循環,直到當前時間大於規定時間。
最後用兩個output port結束當前group. - 將brand的數據存儲到SQL SERVER的一張臨時表裡。
建立一個group,名為tmp_barnd.這個group一開始必須是input port,用於接收上一個group傳出的數據。
SplitRecord:
這裡用到兩個controller service: CSVReader,JsonRecordSetWriter.
根據實際情況修改一下相應屬性。我覺得比較重要的是Value Separator(默認是”,”但是很自定義的csv可能是”;””),Character Set(默認是UTF-8,比如我的文檔里有特殊符號,用的是ISO-8859-1)。
因為是進數據庫,所以為了防止SQL注入,需要先做一些準備工作。
經過上一步,數據已經被拆分成一條條的json,現在就用EvaluateJsonPath提取相應的字段
再用UpateAttribute組裝成Sql語句需要的參數。關於sql.args.[*].type的值,請參考java.sql.Types
最後就是執行SQL語句了。這裡有很多選擇,可以用PutSQL,ExcuteSQL等。
SQL Statement是這樣的:
點擊查看代碼
if not EXISTS (SELECT 1 FROM tmp_modelos
WHERE MODELO=? AND FABRICANTE=? AND DESCRIPCION=? AND TIPO_VEHICULO=?)
INSERT INTO tmp_modelos (MODELO, FABRICANTE, DESCRIPCION, DESCRIPCION_ADDICIONAL, TIPO_VEHICULO) VALUES (?, ?, ?, ?, ?);
?代表參數,有多少?,sql.args屬性就相對有幾個,否則執行時會報參數不匹配。
DBCPConnectionPool的設置如下:
整個流程上要用的processor和controller service差不多就是上面這些,剩下的就是大家按需求組合了。
我剩下兩個group里的流程是這樣的。
還有一個很重要的,就是nifi所用的表達式,大家可以參考一下官方文檔
好了,至此,我們的流程就已經畫完了。接下來就是運行調試了。下篇再見!