­

Nifi:nifi的基本使用

Nifi的安裝使用

Nifi安裝

首先說一下Nifi的安裝,這裡Nifi可以支援Windows版和Linux,只需要去官網://nifi.apache.org/

根據自己需要的版本,選擇下載,然後安裝解壓就行

各目錄及主要文件

解壓安裝以後的Nifi目錄如下:

基本的,bin目錄下放置了 整個系統的控制腳本,lib目錄下放置的Nifi自帶的一個個nar程式包(其實就是Nifi內置的一個個組件)和它本身的程式所需要的載入編譯等等的底層包,state是運行期間的一些數據,docs和work 是Nifi的一些官方文檔和學習樣例

conf目錄下放置的是Nifi的配置文件,這裡詳細說一下:

作為我們基本的使用,這裡只需要注意兩個文件就好,關於其他的配置,有興趣的可以去Nifi官網查看,首先是 nifi-properties 文件,這個文件基本就是整個Nifi的配置中心,裡面包含很多的基本配置,例如啟動埠啊、記憶體分配啊等等,第二個就是 flow.xml.gz,這個文件主要是你整個nifi使用的全記錄,解釋的通俗點,如果你遇到了這麼一個問題 「 我在一台機器上部署了一個Nifi,並且進行了一段時間的使用,建立了很多流程和功能,這時候,需要換到別的機器的Nifi上進行開發」,你建立那些肯定不能挨個再在新環境上來一遍啊,這時候只需要把這個flow.xml.gz替換到新機器的Nifi環境里,重啟新環境的Nifi就可以了。

logs目錄里放的是Nifi運行後的主要的日誌

這裡運行後會有三個日誌, 分別是:

nifi-app.log   整個應用的運行日誌

nifi-bootstrap.log  底層類載入一系列的日誌

nifi-user.log 就簡單理解為用戶的訪問操作日誌吧

Nifi的頁面使用

Nifi默認啟動埠是8080,使用   windows下就bin目錄下雙擊 run-nifi.bat   ,Linux下就 在/bin目錄下,執行 ./nifi.sh start

主頁面介紹

  

進入主頁面以後,它整體就是一個畫布的形式,最上方是個公共導航欄,左側那個Navigate沒啥用,不用在意,就是一個全局視角,下面的Operate是組件控制面板,可以進行單個組件的控制,也可以選中一片組件進行統一的啟動,停止等等。

面板介紹

  首先:

剛剛已經把Nifi的整個頁面理解為一個工作台,最上方就是個導航欄了,從最上面開始,這裡的導航欄分為兩部分,上半部分是提供給我們工作的,下半部分是對整個Nifi環境下的一個監控資訊。這裡簡單介紹一下

  導航欄中的這個菜單,我們可以理解為處理器(Processor)商城,用滑鼠單擊拖出到畫布上,便會出現處理器(Processor)菜單

  導航欄中這個菜單,我叫它為組,什麼叫組呢,當你拉了很多處理器(Processor),形成了一個完整的流程的時候,我們可以單獨把這塊劃分成一個整體了,這時候就要用組把它包裹起來。

   有了組以後,組和組之間可能也需要聯通、通訊,這時候就可以用入口和出口,把它們放在組內

  這個組件需要配合 Operate 中的 上傳使用,主要是用來遷移模板的,這塊後續會專門抽章節講一下

  這一組件,是集群Nifi進行數據通訊的時候用的

  這一組件,就是個便簽,用來寫個備註呀啥的

 這一組件就是個漏斗,主要作用就是把四散的數據可以彙集在一起。

 

Nifi的工作方式

這裡側重點是Nifi中的處理器應用,關於集群、組配合等等方式不在此篇記錄的重點中。

基本方式

首先回顧我們上篇內容說的,Nifi其實就是一個數據接入、處理、清洗、分發的系統,它的工作方式就是將數據看作水管中的水,它是順著某個流程管道流動,在這中間,可以在任意節點處堵截這個「水流」,並對它進行改造,然後放回管道繼續向下流去。

這裡的節點,其實就是Nifi的Processor,你叫它處理器也可以,叫他組件也好,它就是一個黑盒小模組,不同的模組有不同的功能

然後,節點和節點直接的通道,在Nifi里叫Relationship,我把它稱之為管道,就像水管一樣,它本身的意義就是充當水管,把上節點處理完的水傳下去。

在nifi中,都是一個個的流程(處理器+管道),形成一個數據的處理通路。

像這個例子,GetFile組件負責從一個文件里讀取數據,然後把讀到的數據通過管道傳到ExecuteScript組件(這個組件支援用腳本程式碼處理數據),經過ExecuteScript之後,流向PutFile組件(將數據寫入到指定文件中)。

基本流程就是  :   選則一個處理器——>配置該組件至可運行狀態——>關聯下一組件建立管道

 

選擇處理器

  通過「組件商城」 圖標進行處理器的選擇,  處理器是最常用的組件,因為它負責數據的流入,流出,路由和操作。有許多不同類型的處理器。實際上,這是NiFi中非常常見的擴展點,這意味著許多供應商可能會實現自己的處理器來執行其所需的任何功能。將處理器拖動到畫布上時,會向用戶顯示一個對話框:

這裡可以通過處理器的包、處理器的屬性、處理器的名稱等維度進行組件的篩選、選擇。選中後,雙擊則可拖拉至畫布中。

組件狀態

  • 狀態:顯示處理器的當前狀態。以下指標是可能的:

    •  正在運行:處理器當前正在運行。

    •  已停止:處理器有效並已啟用但未運行。

    •  無效:處理器已啟用但當前無效且無法啟動。將滑鼠懸停在此圖標上將提供工具提示,指示處理器無效的原因。一般情況下是需要我們完成必須的配置

    •  已禁用:處理器未運行,在啟用之前無法啟動。此狀態不表示處理器是否有效。

  • 名稱:這是處理器的用戶定義名稱。默認情況下組件的名稱與它的Type相同。在示例中,此值為”ExecuteGroovyScript”,是一個專門用於執行Groovy腳本的組件。

  • 任務:此處理器當前正在執行的任務數。此數字受處理器配置對話框的計劃選項卡中的並發任務設置的約束。在這裡,我們可以看到處理器當前正在執行一項任務。如果NiFi實例是集群的,則此值表示當前正在集群中的所有節點上執行的任務數。

  • 實時日誌:這裡是用於監控當前處理器狀態的,當處理器內部出現問題,一般會在此處顯示錯誤日誌
  • 數據流入流出看板:這裡主要是展示處理數據過程中數據的流入流出情況,Nifi默認是5分鐘更新一次頁面上的看板情況,當然用戶也可以在畫布空白處,滑鼠右鍵選擇刷新,以達到實時查看的效果。
    • In:處理器從其傳入處理器的隊列中提取的數據量。此值表示為count size,其中count是從隊列中提取的FlowFiles的數量,size是這些FlowFiles內容的總大小

    • Read/Write:處理器從磁碟讀取並寫入磁碟的FlowFile內容的總大小。這提供了有關此處理器所需的I/O性能的有用資訊。某些處理器可能只讀取數據而不寫入任何內容,而某些處理器不會讀取數據但只會寫入數據。其他可能既不會讀取也不會寫入數據,而某些處理器會讀取和寫入數據。

    • Out:處理器已傳輸到其出站連接的數據量。這不包括處理器自行刪除的FlowFiles,也不包括路由到自動終止的連接的FlowFiles。與上面的”In”指標一樣,此值表示為count size,其中count是已轉移到出站Connections的FlowFiles的數量,size是這些FlowFiles內容的總大小。

    • Tasks/Time:此處理器在過去5分鐘內被觸發運行的次數,以及執行這些任務所花費的時間。時間格式為hour:minute:second。請注意,所花費的時間可能超過五分鐘,因為許多任務可以並行執行。例如,如果處理器計劃運行60個並發任務,並且每個任務都需要一秒鐘才能完成,則所有60個任務可能會在一秒鐘內完成。但是,在這種情況下,我們會看到時間指標顯示它需要60秒,而不是1秒。

組件配置

Nifi的處理器,一般都有四個標籤頁,分別是SETTINGS,SCHEDULING,PROPERITIES,COMMENTS

除了PROPERITIES之外,另外三個幾乎是通用的,這裡主要說一下這三個實用的。

SETTINGS(通用配置)

 

基本的Name這裡就不說了,就是用戶自定義的名稱,Id、Type、Bundle這三個是這個處理器組件所屬的程式碼包等基本資訊,這裡也不過多介紹,Enable這個選項,就是控制組件由啟用到禁用  狀態的切換。

最右邊包含自動終止關係(Automatically Terminate Relationships)部分。此處列出了處理器定義的每個關係及其描述。為了使處理器被視為有效且能夠運行,處理器定義的每個關係必須連接到下游組件或自動終止。我們可以通過選中它,例如圖中選中Failure一樣,來表示我們棄用這個輸出,也就是不需要它指向下一個組件,這樣這個處理器就變成只有一個對外輸出數據的Relationship了。

接下來是兩個用於配置Penalty Duration和Yield Duration的對話框。在處理一條數據(FlowFile)的正常過程中,可能發生事件,該事件指示處理器此時不能處理數據但是數據可以在稍後進行處理。在發生這種情況時,處理器可以選擇Penalize FlowFile。這將阻止FlowFile在一段時間內被處理。例如,如果處理器要將數據推送到遠程服務,但遠程服務已經有一個與處理器指定的文件名同名的文件,則處理器可能會懲罰FlowFile。Penalty Duration允許DFM指定FlowFile應該受到多長時間的懲罰。默認值為30 seconds。(簡單理解為推後一段時間再處理),類似的處理器可以確定存在某種情況,處理器沒法進行處理數據。例如,如果處理器要將數據推送到遠程服務並且該服務沒有響應。這樣的話處理器應該Yield,這將阻止處理器運行一段時間。通過設置Yield Duration來指定該時間段。默認值為1 second。

最下方Bulletin Level可以簡單的理解為組件的日誌輸出等級的選擇,有選擇地進行日誌等級輸出

SCHEDULING(處理器調度)

這一標籤頁,代表的就是如何驅動處理器,或者說處理器的運作方式:

第一個配置選項是調度策略(Scheduling Strategy)。調度有三種可能的選項:

  • Timer driven:這是默認模式。處理器將定期運行。即多久運行一次,運行處理器的時間間隔由Run Schedule選項定義(當Run Schedule為0時,則代表瞬時執行)。
  • Event driven:選擇此模式時,將由一個事件觸發處理器運行,當FlowFiles進入連接此處理器的Connections時,將產生這個事件。此模式目前被認為是實驗性的,並非所有處理器都支援。選擇此模式時,Run Schedule選項不可配置。此外,只有此模式下Concurrent Tasks選項可以設置為0。這種情況,執行緒數僅受管理員配置的事件驅動執行緒池的大小限制。
  • CRON驅動:這是定時執行模式,即通過cron表達式,進行定時運行的控制。

下面的配置就是執行緒的分配(Concurrent Tasks):這可以控制處理器將使用的執行緒數。換句話說,它控制此處理器應同時處理多少個FlowFiles。增加此值通常會使處理器在相同的時間內處理更多數據。但是,它是通過使用其他處理器無法使用的系統資源來實現此目的。這基本上提供了處理器的相對權重 – 應該將多少系統資源分配給此處理器而不是其他處理器。該欄位適用於大多數處理器。但是,某些類型的處理器只能使用單個任務進行調度。

關於Execution,執行設置用於確定處理器將被調度執行的節點。選擇”All Nodes”將導致在集群中的每個節點上調度此處理器。選擇”Primary Node”將導致此處理器僅在主節點上進行調度。一般單節點的情況下,我們都使用Primary Node

“Run Duration”選項卡的右側包含一個用於選擇運行持續時間的滑塊。這可以控制處理器每次觸發時應安排運行的時間。在滑塊的左側,標記為”Lower latency(較低延遲)”,而右側標記為”Higher throughput(較高吞吐量)”。處理器完成運行後,必須更新存儲庫才能將FlowFiles傳輸到下一個Connection。更新存儲庫的成本很高,因此在更新存儲庫之前可以立即完成的工作量越多,處理器可以處理的工作量就越多(吞吐量越高)。這意味著在上一批數據處理更新此存儲庫之前,Processor是無法開始處理接下來的FlowFiles。結果是,延遲時間會更長(從開始到結束處理FlowFile所需的時間會更長)。因此,滑塊提供了一個頻譜,DFM可以從中選擇支援較低延遲或較高吞吐量。

COMMENTS(備註區)

  這塊把它稱之為」備註區「,即用來為用戶提供一個區域,以包含適用於此組件的任何注釋。

PROPERITIES(屬性區)

這一標籤頁差別較大,一般不同的組件所需要的配置各不相同,具體如果想了解對應組件的屬性配置可以參考官網文檔://nifi.apache.org/docs.html

隊列管道操作

對於隊列管道,它即是數據從一個處理器流向另一個處理器的中間隊列,最多的用處就是用來監控數據是否正常流通,以及在開發使用過程中,可能調試定位問題等需要查看一下管道的數據,這裡主要從管道的來源、手動清空、查看數據、設置超時清空、刪除來描述一下對於管道隊列

管道的來源

管道的建立十分簡單,兩個組件進行一下拖拉連線即可,管道建立後,就需要選擇前置處理器選用哪個Relationship輸出的數據作為管道的源頭,也就是上面配置項那裡的Relationship。

手動清空管道:

管道內的數據承載是有限的,有些時候(阻塞或者需要刪除組件)需要進行手動清空管道的數據,操作方式是:選中管道,右鍵會出現:

查看數據

查看管道中的數據可以選中管道,右鍵後的  List queue選項

同時還可對數據進行下載等操作

設置超時清空

當有些組件處理速度過慢,導致阻塞(允許數據丟失的情況下),我們不能挨個進行手動的清空,這時候可以 在管道的 右鍵  configure 選項中進入管道的配置頁面

在FlowFile Expiration進行超時自動清空的設置,默認為0是不做自動清空

刪除

一般刪除處理器之前,是需要斷開所有與其關聯的管道,即刪除管道,刪除時如果管道中有數據,需要手動制空後,選則 Delete。