Flume 詳解&實戰

Flume

1. 概述

Flume是一個高可用,高可靠,分散式的海量日誌採集、聚合和傳輸的系統。Flume基於流式架構,靈活簡單。

Flume的作用

  • Flume最主要的作用就是,實時讀取伺服器本地磁碟的數據,將數據寫入到HDFS

image-20220426162059164

Flume的特性

  1. 有一個簡單、靈活的基於流的數據流結構
  2. 具有負載均衡機制和故障轉移機制
  3. 一個簡單可擴展的數據模型

Agent component diagram

三大核心組件

Agent 是一個 JVM 進程,它以事件的形式將數據從源頭送至目的

Agent 主要有 3 個部分組成,Source、Channel、Sink

  1. source 數據源

    從外界採集各種類型數據,將數據傳遞給channel

    類型很多:文件、目錄、埠、Kafka等

    • Exec Source :實現文件監控,注意 tail -Ftail -f的區別,前者根據文件名後者根據文件描述進行跟蹤
    • NetCat TCP/UDP Source:採集指定埠(tcp、udp)的數據
    • Spooling Directory Source:採集文件夾里新增的文件
    • Kafka Source:從Kafka消息隊列中採集數據
  2. channel 臨時存儲數據的管道

    接受Source發出的數據,臨時存儲,Channel 是位於 Source 和 Sink 之間的緩衝區。因此,Channel 允許 Source 和 Sink 運作在不同的速率上。Channel 是執行緒安全的,可以同時處理幾個 Source 的寫入操作和幾個Sink 的讀取操作。

    類型很多:記憶體、文件、記憶體+文件、JDBC等

    • Memory Channel:使用記憶體作為數據的存儲
      • 速度快,有丟失風險
    • File Channel:使用文件來作為數據的存儲
      • 效率不高,沒有丟失風險
    • Spillable Memory Channel:使用記憶體和文件作為數據存儲即先存到記憶體中,如果記憶體中數據達到閾值再flush到文件中
  3. sink 採集數據的傳送目的

    從channel中讀取數據並存儲到指定目的地

    Sink的表現形式:控制台、HDFS、Kafka等

    • Channel中的數據直到進入目的地才會被刪除,當Sink寫入失敗後,可以自動重寫,不會造成數據丟失
    • Logger Sink:將數據作為日誌處理
    • HDFS Sink:將數據傳輸到HDFS中
    • Kafka Sink:將數據發送到Kafka消息隊列中
  4. Event

    ​ 傳輸單元,Flume 數據傳輸的基本單元,以 Event 的形式將數據從源頭送至目的地。Event 由 Header 和 Body 兩部分組成,Header 用來存放該 event 的一些屬性,為 K-V 結構,Body 用來存放該條數據,形式為位元組數組。

    image-20220426162757718

2. 入門案例

2.1 監控埠數據官方案例

案例需求:使用 Flume 監聽一個埠,收集該埠數據,並列印到控制台。

需求分析

image-20220426164349045

實現步驟

  1. 安裝 netcat 工具

    sudo yum install -y nc
    
  2. 判斷 44444 埠是否被佔用

    sudo netstat -nlp | grep 44444
    
  3. 在 flume 目錄下創建 job 文件夾並進入 job 文件夾

    mkdir job
    cd job
    
  4. 創建 Flume Agent 配置文件 flume-netcat-logger.conf,添加如下內容

    vim flume-netcat-logger.conf
    
    # Name the components on this agent
    # a1是當前agent的名字,名字在單台flume里要保持唯一
    # 可以配置多個sources sinks channels
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    # sources可以對應多個channel
    # 一個sink只能綁定一個channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    image-20220426170059413

  5. 先開啟 flume 監聽埠

    # 第一種寫法
    bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    
    # 第二種寫法
    bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    

    參數

    • --conf/-c:表示配置文件存儲在 conf/目錄
    • --name/-n:表示給 agent 起名為 a1
    • --conf-file/-f:flume 本次啟動讀取的配置文件是在 job 文件夾下的 flume-telnet.conf文件
    • -Dflume.root.logger=INFO,console-D 表示 flume 運行時動態修改 flume.root.logger 參數屬性值,並將控制台日誌列印級別設置為 INFO 級別。日誌級別包括:log、info、warn、error
  6. 使用 netcat 工具向本機的 44444 埠發送內容

    nc localhost 44444
    
  7. 在 Flume 監聽頁面觀察接收數據情況

    image-20220426171542334

2.2 實時監控單個追加文件

案例需求:實時監控 Hive 日誌,並上傳到 HDFS 中

需求分析

image-20220426171718142

實現步驟

  1. Flume 要想將數據輸出到 HDFS,依賴 Hadoop 相關 jar 包

  2. job包內創建 flume-file-hdfs.conf 文件

    vim flume-file-hdfs.conf
    

    註:要想讀取 Linux 系統中的文件,就得按照 Linux 命令的規則執行命令。由於 Hive日誌在 Linux 系統中所以讀取文件的類型選擇:exec 即 execute 執行的意思。表示執行Linux 命令來讀取文件。

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    # 此路徑是運行flume機器上的本地路徑,因此hive運行機器要與flume保持一致
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/hive-3.1.3/logs/hive.log
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H
    
    #上傳文件的前綴
    a2.sinks.k2.hdfs.filePrefix = logs-
    
    #是否按照時間滾動文件夾
    a2.sinks.k2.hdfs.round = true
    #多少時間單位創建一個新的文件夾
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定義時間單位
    a2.sinks.k2.hdfs.roundUnit = hour
    
    #是否使用本地時間戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    
    #積攢多少個 Event 才 flush 到 HDFS 一次
    a2.sinks.k2.hdfs.batchSize = 100
    
    #設置文件類型,可支援壓縮
    a2.sinks.k2.hdfs.fileType = DataStream
    
    #多久生成一個新的文件
    a2.sinks.k2.hdfs.rollInterval = 60
    #設置每個文件的滾動大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滾動與 Event 數量無關
    a2.sinks.k2.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    

    注意:對於所有與時間相關的轉義序列,Event Header 中必須存在以 「timestamp」的key(除非 hdfs.useLocalTimeStamp 設置為 true,此方法會使用 TimestampInterceptor 自動添加 timestamp)

    image-20220426190619260

  3. 運行flume

    bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
    
  4. 開啟 Hadoop 和 Hive 並操作 Hive 產生日誌

    sbin/start-dfs.sh
    sbin/start-yarn.sh
    bin/hive
    

2.3 實時監控目錄下多個新文件

案例需求:使用 Flume 監聽整個目錄的文件,並上傳至 HDFS

需求分析

image-20220426201000651

實現步驟

  1. 創建配置文件 flume-dir-hdfs.conf

    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/flume-1.9.0/upload
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    
    #忽略所有以.tmp 結尾的文件,不上傳
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
    
    #上傳文件的前綴
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照時間滾動文件夾
    a3.sinks.k3.hdfs.round = true
    #多少時間單位創建一個新的文件夾
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定義時間單位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地時間戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #積攢多少個 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #設置文件類型,可支援壓縮
    a3.sinks.k3.hdfs.fileType = DataStream
    
    #多久生成一個新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #設置每個文件的滾動大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滾動與 Event 數量無關
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    

    image-20220426202124640

  2. 啟動監控文件夾命令

    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
    

    說明:在使用 Spooling Directory Source 時,不要在監控目錄中創建並持續修改文件;上傳完成的文件會以.COMPLETED 結尾;被監控文件夾每 500 毫秒掃描一次文件變動

  3. 向 upload 文件夾中添加文件

    在/opt/module/flume 目錄下創建 upload 文件夾

    向 upload 文件夾中添加文件

  4. 查看 HDFS 上的數據

2.4 實時監控目錄下的多個追加文件

Exec source 適用於監控一個實時追加的文件,不能實現斷點續傳;Spooldir Source適合用於同步新文件,但不適合對實時追加日誌的文件進行監聽並同步;而 Taildir Source適合用於監聽多個實時追加的文件,並且能夠實現斷點續傳。

案例需求:使用 Flume 監聽整個目錄的實時追加文件,並上傳至 HDFS

需求分析:

image-20220426202436463

實現步驟

  1. 創建配置文件 flume-taildir-hdfs.conf

    vim flume-taildir-hdfs.conf
    
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = TAILDIR
    a3.sources.r3.positionFile = /opt/flume-1.9.0/tail_dir.json
    a3.sources.r3.filegroups = f1 f2
    a3.sources.r3.filegroups.f1 = /opt/flume-1.9.0/files/.*file.*
    a3.sources.r3.filegroups.f2 = /opt/flume-1.9.0/files2/.*log.*
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H
    
    #上傳文件的前綴
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照時間滾動文件夾
    a3.sinks.k3.hdfs.round = true
    
    #多少時間單位創建一個新的文件夾
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定義時間單位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地時間戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    
    #積攢多少個 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #設置文件類型,可支援壓縮
    a3.sinks.k3.hdfs.fileType = DataStream
    
    #多久生成一個新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #設置每個文件的滾動大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滾動與 Event 數量無關
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    

    image-20220426202650933

  2. 啟動監控文件夾命令

    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
    
  3. 向 files 文件夾中追加內容

    在/opt/module/flume 目錄下創建 files 文件夾

    向 upload 文件夾中添加文件

  4. 查看 HDFS 上的數據

Taildir 說明

Taildir Source 維護了一個 json 格式的 position File,其會定期的往 position File中更新每個文件讀取到的最新的位置,因此能夠實現斷點續傳。 Position File 的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

註:Linux 中儲存文件元數據的區域就叫做 inode,每個 inode 都有一個號碼,作業系統用 inode 號碼來識別不同的文件,Unix/Linux 系統內部不使用文件名,而使用 inode 號碼來識別文件。

3. Flume進階

3.1 Flume事務

image-20220426211113587

3.2 Agent內部原理

image-20220426211206998

重要組件

1)ChannelSelector:Source發往多個Channel的策略設置

  • ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel。其共有兩種類型,分別是 Replicating(複製)和 Multiplexing(多路復用)。

  • ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相應的原則,將不同的 Event 發往不同的 Channel。

2)SinkProcessor:Sink發送數據的策略設置

  • SinkProcessor共 有 三 種 類 型 , 分 別 是DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
  • DefaultSinkProcessor對應的是單個的Sink
  • LoadBalancingSinkProcessor 和FailoverSinkProcessor 對應的是 Sink Group
  • LoadBalancingSinkProcessor 可以實現負載均衡的功能,FailoverSinkProcessor 可以錯誤恢復的功能。

3.3 Flume拓撲結構

  1. 簡單串聯

    這種模式是將多個 flume 順序連接起來了,從最初的 source 開始到最終 sink 傳送的目的存儲系統。此模式不建議橋接過多的 flume 數量, flume 數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點 flume 宕機,會影響整個傳輸系統。

image-20220426212627227

  1. 複製和多路復用

    Flume 支援將事件流向一個或者多個目的地。這種模式可以將相同數據複製到多個channel 中,或者將不同數據分發到不同的 channel 中,sink 可以選擇傳送到不同的目的地。

image-20220426212535250

  1. 負載均衡和故障轉移

    Flume 支援使用將多個 sink 邏輯上分到一個 sink 組,sink 組配合不同的 SinkProcessor可以實現負載均衡和錯誤恢復的功能。

    image-20220426212953437

  2. 聚合

    這種模式是我們最常見的,也非常實用,日常 web 應用通常分布在上百個伺服器,大者甚至上千個、上萬個伺服器。產生的日誌,處理起來也非常麻煩。用 flume 的這種組合方式能很好的解決這一問題,每台伺服器部署一個 flume 採集日誌,傳送到一個集中收集日誌的flume,再由此 flume 上傳到 hdfs、hive、hbase 等,進行日誌分析。

    image-20220426212554334

4. Flume企業開發案例

註:使用 jps -ml 查看 Flume 進程。

4.1 複製和多路復用

案例需求

  • 使用 Flume-1 監控文件變動,Flume-1 將變動內容傳遞給 Flume-2,Flume-2 負責存儲到 HDFS。同時 Flume-1 將變動內容傳遞給 Flume-3,Flume-3 負責輸出到 LocalFileSystem。

需求分析

image-20220426213603385

實現步驟

  1. 準備工作

    在job內創建group1文件夾,用於存放配置文件

  2. 創建flume-file-flume.conf

    配置一個接收日誌文件的source和兩個channel、兩個sink,分別輸送給hdfs和本地文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # 將數據流複製給所有 channel
    # 默認選擇器就是複製,可以不需要
    a1.sources.r1.selector.type = replicating
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/hive-3.1.3/logs/hive.log
    a1.sources.r1.shell = /bin/bash -c
    
    # Describe the sink
    # sink 端的 avro 是一個數據發送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node1
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = node1
    a1.sinks.k2.port = 4142
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
  3. 創建 flume-flume-hdfs.conf

    配置上級 Flume 輸出的 Source,輸出是到 HDFS 的 Sink

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    # source 端的 avro 是一個數據接收服務
    a2.sources.r1.type = avro
    a2.sources.r1.bind = node1
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://node1:8020/flume2/%Y%m%d/%H
    #上傳文件的前綴
    a2.sinks.k1.hdfs.filePrefix = flume2-
    #是否按照時間滾動文件夾
    a2.sinks.k1.hdfs.round = true
    #多少時間單位創建一個新的文件夾
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定義時間單位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地時間戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #積攢多少個 Event 才 flush 到 HDFS 一次
    a2.sinks.k1.hdfs.batchSize = 100
    #設置文件類型,可支援壓縮
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一個新的文件
    a2.sinks.k1.hdfs.rollInterval = 30
    #設置每個文件的滾動大小大概是 128M
    a2.sinks.k1.hdfs.rollSize = 134217700
    #文件的滾動與 Event 數量無關
    a2.sinks.k1.hdfs.rollCount = 0
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. 創建 flume-flume-dir.conf

    配置上級 Flume 輸出的 Source,輸出是到本地目錄的 Sink

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = node1
    a3.sources.r1.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/data/flume3
    
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    

    提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。

  5. 執行配置文件

    要先啟動後兩個配置文件,因為後兩個是第一個配置文件的sink的服務端

    bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
    bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
    bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
    
  6. 啟動Hadoop和hive

4.2 負載均衡和故障轉移

案例需求

  • 使用 Flume1 監控一個埠,其 sink 組中的 sink 分別對接 Flume2 和 Flume3,采FailoverSinkProcessor,實現故障轉移的功能。

需求分析

image-20220427150151273

實現步驟

  1. job 目錄下創建 group2 文件夾,用於存放配置文件

  2. 創建 flume-netcat-flume.conf

    配置 1 個 netcat source 和 1 個 channel、1 個 sink group(2 個 sink),分別輸送給flume-flume-console1 和 flume-flume-console2

    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 故障轉移
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    # 負載均衡(開啟這段就要注釋故障轉移)
    # a1.sinkgroups.g1.processor.type = failover
    # sink拉取數據失敗後,下次退避一段時間,默認最大退避時間30s
    # a1.sinkgroups.g1.processor.backoff = true
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node1
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = node1
    a1.sinks.k2.port = 4142
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
  3. 創建 flume-flume-console1.conf

    配置上級 Flume 輸出的 Source,輸出是到本地控制台

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = node1
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = logger
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. 創建 flume-flume-console2.conf

    配置上級 Flume 輸出的 Source,輸出是到本地控制台

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = node1
    a3.sources.r1.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
  5. 執行配置文件

    分別開啟對應配置文件: flume-flume-console2, flume-flume-console1, flume-netcat-flume

    bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
    
  6. 使用 netcat 工具向本機的 44444 埠發送內容

    nc localhost 44444
    
  7. 查看 Flume2 及 Flume3 的控制台列印日誌

  8. 將 Flume2 kill,觀察 Flume3 的控制台列印情況

4.3 聚合

案例需求

  • node1 上的 Flume-1 監控文件/opt/module/group.log

  • node2 上的 Flume-2 監控某一個埠的數據流

  • Flume-1 與 Flume-2 將數據發送給 node3 上的 Flume-3,Flume-3 將最終數據列印到控制台

需求分析

image-20220427160137812

實現步驟

  1. 準備工作
    分發 Flume到node2,node3

    在 node1、node2 以及 node3 的/XXX/job 目錄下創建一個group3 文件夾

  2. 創建 flume1-logger-flume.conf

    配置 Source 用於監控 group.log 文件,配置 Sink 輸出數據到下一級 Flume

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/data/group.log
    a1.sources.r1.shell = /bin/bash -c
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node3
    a1.sinks.k1.port = 4141
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. 創建 flume2-netcat-flume.conf
    配置 Source 監控埠 44444 數據流,配置 Sink 數據到下一級 Flume

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = node2
    a2.sources.r1.port = 44444
    
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = node3
    a2.sinks.k1.port = 4141
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. 創建 flume3-flume-logger.conf
    配置 source 用於接收 flume1 與 flume2 發送過來的數據流,最終合併後 sink 到控制台

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = node3
    a3.sources.r1.port = 4141
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
  5. 執行配置文件
    分別開啟對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf

    bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
    
    bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
    
  6. 在 node1 上向/opt/dara 目錄下的 group.log 追加內容

    echo 'hello' > group.log
    
  7. 在node2 上向 44444 埠發送數據

    nc node2 44444
    

5. 自定義組件

5.1 自定義Interceptor

案例需求

  • 使用 Flume 採集伺服器本地日誌,需要按照日誌類型的不同,將不同種類的日誌發往不同的分析系統。

需求分析

  • 在實際的開發中,一台伺服器產生的日誌類型可能有很多種,不同類型的日誌可能需要發送到不同的分析系統。此時會用到 Flume 拓撲結構中的 Multiplexing 結構, Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。

image-20220427170637762

實現步驟

  1. 準備工作

    創建一個 maven 項目,並引入以下依賴。

    <dependency>
    	<groupId>org.apache.flume</groupId>
    	<artifactId>flume-ng-core</artifactId>
    	<version>1.9.0</version>
    </dependency>
    

    定義 CustomInterceptor 類並實現 Interceptor 介面

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class multiLoads implements Interceptor {
        //聲明一個存放事件的集合
        private List<Event> addHeaderEvents;
    
        @Override
        public void initialize() {
            //初始化存放事件的集合
            addHeaderEvents = new ArrayList<>();
        }
        //單個事件攔截
        @Override
        public Event intercept(Event event) {
            //1.獲取事件中的頭資訊
            Map<String, String> headers = event.getHeaders();
            //2.獲取事件中的 body 資訊
            String body = new String(event.getBody());
            //3.根據 body 中是否有"atguigu"來決定添加怎樣的頭資訊
            if (body.contains("atguigu")) {
                //4.添加頭資訊
                headers.put("type", "atguigu");
            } else {
                //4.添加頭資訊
                headers.put("type", "other");
            }
            return event;
        }
        //批量事件攔截
        @Override
        public List<Event> intercept(List<Event> events) {
            //1.清空集合
            addHeaderEvents.clear();
            //2.遍歷 events
            for (Event event : events) {
                //3.給每一個事件添加頭資訊
                addHeaderEvents.add(intercept(event));
            }
            //4.返回結果
            return addHeaderEvents;
        }
        @Override
        public void close() {
        }
        public static class Builder implements Interceptor.Builder {
            @Override
            public Interceptor build() {
                return new multiLoads();
            }
            @Override
            public void configure(Context context) {
            }
        }
    }
    
  2. 編輯配置文件

    flume1.conf 放到 node1的group4文件中

    配置 1 個 netcat source,1 個 sink group( 2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.flume.multiLoads$Builder
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.atguigu = c1
    a1.sources.r1.selector.mapping.other = c2
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = node3
    a1.sinks.k2.port = 4242
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    flume2.conf 放到node2的group4文件夾中

    配置一個 avro source 和一個 logger sink

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 4141
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1
    

    flume3.conf 放到node3的group4文件夾中

    配置一個 avro source 和一個 logger sink

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node3
    a1.sources.r1.port = 4242
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1
    
  3. 分別在 node1,node2,node3上啟動 flume 進程,注意先後順序,先node3/node2 再node1

    bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume3.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume2.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume1.conf
    
  4. 在 node1 使用 netcat 向 localhost:44444 發送字母和數字

    nc localhost 44444
    

5.2 自定義source

介紹

  • Source 是負責接收數據到 Flume Agent 的組件。Source 組件可以處理各種類型、各種格式的日誌數據,包括 avro、thrift、exec、jms、spooling directory、netcat、sequencegenerator、syslog、http、legacy。官方提供的 source 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source。

  • 官方也提供了自定義 source 的介面://flume.apache.org/FlumeDeveloperGuide.html#source 根據官方說明自定義MySource 需要繼承 AbstractSource 類並實現 Configurable 和 PollableSource 介面。

  • 實現相應方法:

    • getBackOffSleepIncrement() //backoff 步長

    • getMaxBackOffSleepInterval() //backoff 最長時間

    • configure(Context context)//初始化 context(讀取配置文件內容)

    • process()//獲取數據封裝成 event 並寫入 channel,這個方法將被循環調用。

  • 使用場景:讀取 MySQL 數據或者其他文件系統

需求

使用 flume 接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從 flume 配置文件中配置。

自定義Source需求

image-20220427200422077
image-20220427200450458

編碼

  1. 構建maven項目,導入依賴

    <dependency>
    	<groupId>org.apache.flume</groupId>
    	<artifactId>flume-ng-core</artifactId>
    	<version>1.9.0</version>
    </dependency>
    
  2. 編寫程式碼

    import org.apache.flume.Context;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    import java.util.HashMap;
    
    public class MySource extends AbstractSource implements
            Configurable, PollableSource {
        //定義配置文件將來要讀取的欄位
        private Long delay;
        private String field;
        //初始化配置資訊
        @Override
        public void configure(Context context) {
            delay = context.getLong("delay");
     		// 默認值是hello
            field = context.getString("field", "Hello!");
        }
        @Override
        public Status process() throws EventDeliveryException {
            try {
                //創建事件頭資訊
                HashMap<String, String> hearderMap = new HashMap<>();
                //創建事件
                SimpleEvent event = new SimpleEvent();
                //循環封裝事件
                for (int i = 0; i < 5; i++) {
                    //給事件設置頭資訊
                    event.setHeaders(hearderMap);
                    //給事件設置內容
                    event.setBody((field + i).getBytes());
                    //將事件寫入 channel
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
        }
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    }
    
  3. 配置文件

    打包——將寫好的程式碼打包,並放到 flume 的 lib 目錄下

    創建配置文件mysource.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    # 改寫全限定名
    a1.sources.r1.type = XXX.XXX.MySource
    a1.sources.r1.delay = 1000
    # 自定義後綴是什麼
    # a1.sources.r1.field = world
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  4. 開啟任務

    bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
    

    image-20220427202944851

5.3 自定義sink

介紹

  • Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent。

  • Sink 是完全事務性的。在從 Channel 批量刪除數據之前,每個 Sink 用 Channel 啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務。事務一旦被提交,該 Channel 從自己的內部緩衝區刪除事件。

  • Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的 Sink 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 Sink。

  • 官方也提供了自定義 sink 的介面://flume.apache.org/FlumeDeveloperGuide.html#sink 根 據 官 方 說 明自定義MySink 需要繼承 AbstractSink 類並實現 Configurable 介面。

  • 實現相應方法:

    • configure(Context context) //初始化 context(讀取配置文件內容)
    • process() //從 Channel 讀取獲取數據(event),這個方法將被循環調用。
  • 使用場景:讀取 Channel 數據寫入 MySQL 或者其他文件系統。

需求

使用 flume 接收數據,並在 Sink 端給每條數據添加前綴和後綴,輸出到控制台。前後綴可在 flume 任務配置文件中配置

image-20220427201625678

編寫

  1. 編碼

    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySink extends AbstractSink implements Configurable
    {
        //創建 Logger 對象
        private static final Logger LOG =             LoggerFactory.getLogger(AbstractSink.class);
        private String prefix;
        private String suffix;
        @Override
        public Status process() throws EventDeliveryException {
            //聲明返回值狀態資訊
            Status status;
            //獲取當前 Sink 綁定的 Channel
            Channel ch = getChannel();
            //獲取事務
            Transaction txn = ch.getTransaction();
            //聲明事件
            Event event;
            //開啟事務
            txn.begin();
            //讀取 Channel 中的事件,直到讀取到事件結束循環
            while (true) {
                event = ch.take();
                if (event != null) {
                    break;
                }
            }
            try {
                //處理事件(列印)
                LOG.info(prefix + new String(event.getBody()) + suffix);
                //事務提交
                txn.commit();
                status = Status.READY;
            } catch (Exception e) {
                //遇到異常,事務回滾
                txn.rollback();
                status = Status.BACKOFF;
            } finally {
                //關閉事務
                txn.close();
            }
            return status;
        }
        @Override
        public void configure(Context context) {
            //讀取配置文件內容,有默認值
            prefix = context.getString("prefix", "hello:");
            //讀取配置文件內容,無默認值
            suffix = context.getString("suffix");
        }
    }
    
  2. 打包和配置文件

    將寫好的程式碼打包,並放到 flume 的 lib 目錄

    創建配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = XXX.XXX.MySink
    a1.sinks.k1.prefix = atguigu:
    a1.sinks.k1.suffix = :atguigu
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. 開啟任務

    bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
    
  4. 測試

    nc localhost 44444
    

    image-20220427203346666

  5. 關閉進程,ctrl+c進程無法退出

    # 先獲取進程的pid
    jps -l
    # 15445 org.apache.flume.node.Application
    
    # 殺死進程
    kill -9 15445
    

6. 事務源碼

image-20220426211113587

**Transaction **

public interface Transaction {

  enum TransactionState { Started, Committed, RolledBack, Closed }

  /**
   * <p>Starts a transaction boundary for the current channel operation. If a
   * transaction is already in progress, this method will join that transaction
   * using reference counting.</p>
   * <p><strong>Note</strong>: For every invocation of this method there must
   * be a corresponding invocation of {@linkplain #close()} method. Failure
   * to ensure this can lead to dangling transactions and unpredictable results.
   * </p>
   */
  void begin();

  /**
   * Indicates that the transaction can be successfully committed. It is
   * required that a transaction be in progress when this method is invoked.
   */
  void commit();

  /**
   * Indicates that the transaction can must be aborted. It is
   * required that a transaction be in progress when this method is invoked.
   */
  void rollback();

  /**
   * <p>Ends a transaction boundary for the current channel operation. If a
   * transaction is already in progress, this method will join that transaction
   * using reference counting. The transaction is completed only if there
   * are no more references left for this transaction.</p>
   * <p><strong>Note</strong>: For every invocation of this method there must
   * be a corresponding invocation of {@linkplain #begin()} method. Failure
   * to ensure this can lead to dangling transactions and unpredictable results.
   * </p>
   */
  void close();
}

rollback方法的實踐方法

protected void doRollback() {
  int takes = takeList.size();
  synchronized (queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
        "Not enough space in memory channel " +
        "queue to rollback takes. This should never happen, please report");
    while (!takeList.isEmpty()) {
      // take操作有回滾不會丟失
      // 寫回原隊列
      queue.addFirst(takeList.removeLast());
    }
    // put操作直接清除了,沒有回滾,可能會導致丟失
    // 但是TAILDIR模式下的put,不會丟失,因為只有成功doCommit才會使得文件的記錄資訊改變
    // netcat會直接丟失
    putList.clear();
  }
  putByteCounter = 0;
  takeByteCounter = 0;
  queueStored.release(takes);
  channelCounter.setChannelSize(queue.size());
}

Commit方法的實踐方法

protected void doCommit() throws InterruptedException {
      int remainingChange = takeList.size() - putList.size();
      if (remainingChange < 0) {
        if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
              "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
              "reached. Please increase heap space/byte capacity allocated to " +
              "the channel as the sinks may not be keeping up with the sources");
        }
        if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized (queueLock) {
        if (puts > 0) {
          while (!putList.isEmpty()) {
            if (!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        putList.clear();
        takeList.clear();
      }
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      if (remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }

7. Flume優化

7.1 記憶體調整

調整Flume進程的記憶體大小,建議設置1G-2G,太小的話會導致頻繁GC

jstat -gcutil 15445 1000
# 顯示15445java進程,1s的記憶體變化
  S0區   S1區   Eden區  Old區  元空間   YoungGC執行次數 執行時間 FullGC次數 時間  總的GC時間
  S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT   
  0.00   0.00   0.00  65.35  95.16  89.45 309440  205.624     3    0.040  205.664
  6.25   0.00   0.00  65.35  95.16  89.45 309556  205.690     3    0.040  205.729
  6.25   0.00   0.00  65.68  95.16  89.45 309674  205.749     3    0.040  205.788
  0.00   6.25   0.00  65.68  95.16  89.45 309790  205.816     3    0.040  205.856
  6.25   0.00   0.00  65.76  95.16  89.45 309907  205.881     3    0.040  205.921
  0.00   6.25   0.00  66.01  95.16  89.45 310023  205.944     3    0.040  205.984
  0.00   6.25   0.00  66.01  95.16  89.45 310139  206.008     3    0.040  206.048
  6.25   0.00   0.00  66.01  95.16  89.45 310254  206.074     3    0.040  206.114
  0.00   6.25   0.00  66.34  95.16  89.45 310371  206.134     3    0.040  206.174
  0.00   6.25   0.00  66.34  95.16  89.45 310488  206.192     3    0.040  206.232

如果YGC執行次數增加的很快,可以適當增加運行記憶體

修改conf文件夾內的flume-env.sh文件

# Xms是起始記憶體 Xmx是最大記憶體
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

7.2 日誌配置

啟動多個Flume進程是,建議修改配置區分日誌文件

修改配置多個conf文件夾,並修改的log4.properties文件

運行flume進程時,指定某一個conf文件

# 修改日誌記錄的級別
flume.root.logger=INFO,LOGFILE
# 修改日誌放置的文件,不用改
flume.log.dir=./logs
# 修改日誌文件的名稱
flume.log.file=flume.log

7.3 Flume進程監控

Flume是一個單進程程式,會存在單點故障,所以需要有一個監控機制,發現Flume進行Down掉之後,需要重啟

  • 通過Shell腳本實現Flume進程監控以及自動重啟

配置文件

molist.conf

# 這裡的example名稱要保證能用ps -ef | grep example定位到進程
example=startExample.sh

啟動腳本

startExample.sh

#!/bin/bash
flume_path=/XXX/XXX
nohup ${flume_path}/bin/flume-ng agent -c ${flume_path}/conf -n a1 -f ${flume_path}/XXX/XXX.conf &

監控腳本

monList.sh

#!/bin/bash
monlist=`cat molist.conf`
echo "===start check==="
for item in ${monlist}
do
	# 設置欄位分隔符
	OLD_IFS=$IFS
	IFS="="
	# 把一行內容轉成多列[數組]
	arr=($item)
	# 獲取等號左邊的內容
	name=${arr[0]}
	# 獲取等號右邊的內容
	script=${arr[1]}
	
	echo "time is:"`date +"%Y-%m-%d %H:%M:%S"`"check"$name
	
	if [ `jps -m|grep $name | wc -l` -eq 0 ]
	then
		echo `date +"%Y-%m-%d %H:%M:%S"`$name "is none"
		sh -x ./${script}
	fi
done

可以設置crontab定時調度monlist.sh腳本

Tags: