Flume 詳解&實戰
Flume
1. 概述
Flume是一個高可用,高可靠,分散式的海量日誌採集、聚合和傳輸的系統。Flume基於流式架構,靈活簡單。
Flume的作用
- Flume最主要的作用就是,實時讀取伺服器本地磁碟的數據,將數據寫入到HDFS
Flume的特性
- 有一個簡單、靈活的基於流的數據流結構
- 具有負載均衡機制和故障轉移機制
- 一個簡單可擴展的數據模型
三大核心組件
Agent 是一個 JVM 進程,它以事件的形式將數據從源頭送至目的
Agent 主要有 3 個部分組成,Source、Channel、Sink
-
source 數據源
從外界採集各種類型數據,將數據傳遞給channel
類型很多:文件、目錄、埠、Kafka等
Exec Source
:實現文件監控,注意tail -F
和tail -f
的區別,前者根據文件名後者根據文件描述進行跟蹤NetCat TCP/UDP Source
:採集指定埠(tcp、udp)的數據Spooling Directory Source
:採集文件夾里新增的文件Kafka Source
:從Kafka消息隊列中採集數據
-
channel 臨時存儲數據的管道
接受Source發出的數據,臨時存儲,Channel 是位於 Source 和 Sink 之間的緩衝區。因此,Channel 允許 Source 和 Sink 運作在不同的速率上。Channel 是執行緒安全的,可以同時處理幾個 Source 的寫入操作和幾個Sink 的讀取操作。
類型很多:記憶體、文件、記憶體+文件、JDBC等
Memory Channel
:使用記憶體作為數據的存儲- 速度快,有丟失風險
File Channel
:使用文件來作為數據的存儲- 效率不高,沒有丟失風險
Spillable Memory Channel
:使用記憶體和文件作為數據存儲即先存到記憶體中,如果記憶體中數據達到閾值再flush到文件中
-
sink 採集數據的傳送目的
從channel中讀取數據並存儲到指定目的地
Sink的表現形式:控制台、HDFS、Kafka等
- Channel中的數據直到進入目的地才會被刪除,當Sink寫入失敗後,可以自動重寫,不會造成數據丟失
Logger Sink
:將數據作為日誌處理HDFS Sink
:將數據傳輸到HDFS中Kafka Sink
:將數據發送到Kafka消息隊列中
-
Event
傳輸單元,Flume 數據傳輸的基本單元,以 Event 的形式將數據從源頭送至目的地。Event 由 Header 和 Body 兩部分組成,Header 用來存放該 event 的一些屬性,為 K-V 結構,Body 用來存放該條數據,形式為位元組數組。
2. 入門案例
2.1 監控埠數據官方案例
案例需求:使用 Flume 監聽一個埠,收集該埠數據,並列印到控制台。
需求分析
實現步驟
-
安裝 netcat 工具
sudo yum install -y nc
-
判斷 44444 埠是否被佔用
sudo netstat -nlp | grep 44444
-
在 flume 目錄下創建 job 文件夾並進入 job 文件夾
mkdir job cd job
-
創建 Flume Agent 配置文件 flume-netcat-logger.conf,添加如下內容
vim flume-netcat-logger.conf
# Name the components on this agent # a1是當前agent的名字,名字在單台flume里要保持唯一 # 可以配置多個sources sinks channels a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # sources可以對應多個channel # 一個sink只能綁定一個channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
先開啟 flume 監聽埠
# 第一種寫法 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console # 第二種寫法 bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
參數
--conf/-c
:表示配置文件存儲在 conf/目錄--name/-n
:表示給 agent 起名為 a1--conf-file/-f
:flume 本次啟動讀取的配置文件是在 job 文件夾下的 flume-telnet.conf文件-Dflume.root.logger=INFO,console
:-D
表示 flume 運行時動態修改 flume.root.logger 參數屬性值,並將控制台日誌列印級別設置為 INFO 級別。日誌級別包括:log、info、warn、error
-
使用 netcat 工具向本機的 44444 埠發送內容
nc localhost 44444
-
在 Flume 監聽頁面觀察接收數據情況
2.2 實時監控單個追加文件
案例需求:實時監控 Hive 日誌,並上傳到 HDFS 中
需求分析
實現步驟
-
Flume 要想將數據輸出到 HDFS,依賴 Hadoop 相關 jar 包
-
job包內創建 flume-file-hdfs.conf 文件
vim flume-file-hdfs.conf
註:要想讀取 Linux 系統中的文件,就得按照 Linux 命令的規則執行命令。由於 Hive日誌在 Linux 系統中所以讀取文件的類型選擇:exec 即 execute 執行的意思。表示執行Linux 命令來讀取文件。
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source # 此路徑是運行flume機器上的本地路徑,因此hive運行機器要與flume保持一致 a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/hive-3.1.3/logs/hive.log # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H #上傳文件的前綴 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照時間滾動文件夾 a2.sinks.k2.hdfs.round = true #多少時間單位創建一個新的文件夾 a2.sinks.k2.hdfs.roundValue = 1 #重新定義時間單位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #積攢多少個 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 100 #設置文件類型,可支援壓縮 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一個新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #設置每個文件的滾動大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滾動與 Event 數量無關 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
注意:對於所有與時間相關的轉義序列,Event Header 中必須存在以 「timestamp」的key(除非 hdfs.useLocalTimeStamp 設置為 true,此方法會使用 TimestampInterceptor 自動添加 timestamp)
-
運行flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
-
開啟 Hadoop 和 Hive 並操作 Hive 產生日誌
sbin/start-dfs.sh sbin/start-yarn.sh bin/hive
2.3 實時監控目錄下多個新文件
案例需求:使用 Flume 監聽整個目錄的文件,並上傳至 HDFS
需求分析:
實現步驟
-
創建配置文件 flume-dir-hdfs.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/flume-1.9.0/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp 結尾的文件,不上傳 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H #上傳文件的前綴 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照時間滾動文件夾 a3.sinks.k3.hdfs.round = true #多少時間單位創建一個新的文件夾 a3.sinks.k3.hdfs.roundValue = 1 #重新定義時間單位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #積攢多少個 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #設置文件類型,可支援壓縮 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一個新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #設置每個文件的滾動大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滾動與 Event 數量無關 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
-
啟動監控文件夾命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
說明:在使用 Spooling Directory Source 時,不要在監控目錄中創建並持續修改文件;上傳完成的文件會以.COMPLETED 結尾;被監控文件夾每 500 毫秒掃描一次文件變動
-
向 upload 文件夾中添加文件
在/opt/module/flume 目錄下創建 upload 文件夾
向 upload 文件夾中添加文件
-
查看 HDFS 上的數據
2.4 實時監控目錄下的多個追加文件
Exec source 適用於監控一個實時追加的文件,不能實現斷點續傳;Spooldir Source適合用於同步新文件,但不適合對實時追加日誌的文件進行監聽並同步;而 Taildir Source適合用於監聽多個實時追加的文件,並且能夠實現斷點續傳。
案例需求:使用 Flume 監聽整個目錄的實時追加文件,並上傳至 HDFS
需求分析:
實現步驟
-
創建配置文件 flume-taildir-hdfs.conf
vim flume-taildir-hdfs.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR a3.sources.r3.positionFile = /opt/flume-1.9.0/tail_dir.json a3.sources.r3.filegroups = f1 f2 a3.sources.r3.filegroups.f1 = /opt/flume-1.9.0/files/.*file.* a3.sources.r3.filegroups.f2 = /opt/flume-1.9.0/files2/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H #上傳文件的前綴 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照時間滾動文件夾 a3.sinks.k3.hdfs.round = true #多少時間單位創建一個新的文件夾 a3.sinks.k3.hdfs.roundValue = 1 #重新定義時間單位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #積攢多少個 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #設置文件類型,可支援壓縮 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一個新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #設置每個文件的滾動大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滾動與 Event 數量無關 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
-
啟動監控文件夾命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
-
向 files 文件夾中追加內容
在/opt/module/flume 目錄下創建 files 文件夾
向 upload 文件夾中添加文件
-
查看 HDFS 上的數據
Taildir 說明
Taildir Source 維護了一個 json 格式的 position File,其會定期的往 position File中更新每個文件讀取到的最新的位置,因此能夠實現斷點續傳。 Position File 的格式如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
註:Linux 中儲存文件元數據的區域就叫做 inode,每個 inode 都有一個號碼,作業系統用 inode 號碼來識別不同的文件,Unix/Linux 系統內部不使用文件名,而使用 inode 號碼來識別文件。
3. Flume進階
3.1 Flume事務
3.2 Agent內部原理
重要組件
1)ChannelSelector:Source發往多個Channel的策略設置
-
ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel。其共有兩種類型,分別是 Replicating(複製)和 Multiplexing(多路復用)。
-
ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相應的原則,將不同的 Event 發往不同的 Channel。
2)SinkProcessor:Sink發送數據的策略設置
- SinkProcessor共 有 三 種 類 型 , 分 別 是DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
- DefaultSinkProcessor對應的是單個的Sink
- LoadBalancingSinkProcessor 和FailoverSinkProcessor 對應的是 Sink Group
- LoadBalancingSinkProcessor 可以實現負載均衡的功能,FailoverSinkProcessor 可以錯誤恢復的功能。
3.3 Flume拓撲結構
-
簡單串聯
這種模式是將多個 flume 順序連接起來了,從最初的 source 開始到最終 sink 傳送的目的存儲系統。此模式不建議橋接過多的 flume 數量, flume 數量過多不僅會影響傳輸速率,而且一旦傳輸過程中某個節點 flume 宕機,會影響整個傳輸系統。
-
複製和多路復用
Flume 支援將事件流向一個或者多個目的地。這種模式可以將相同數據複製到多個channel 中,或者將不同數據分發到不同的 channel 中,sink 可以選擇傳送到不同的目的地。
-
負載均衡和故障轉移
Flume 支援使用將多個 sink 邏輯上分到一個 sink 組,sink 組配合不同的 SinkProcessor可以實現負載均衡和錯誤恢復的功能。
-
聚合
這種模式是我們最常見的,也非常實用,日常 web 應用通常分布在上百個伺服器,大者甚至上千個、上萬個伺服器。產生的日誌,處理起來也非常麻煩。用 flume 的這種組合方式能很好的解決這一問題,每台伺服器部署一個 flume 採集日誌,傳送到一個集中收集日誌的flume,再由此 flume 上傳到 hdfs、hive、hbase 等,進行日誌分析。
4. Flume企業開發案例
註:使用 jps -ml 查看 Flume 進程。
4.1 複製和多路復用
案例需求
- 使用 Flume-1 監控文件變動,Flume-1 將變動內容傳遞給 Flume-2,Flume-2 負責存儲到 HDFS。同時 Flume-1 將變動內容傳遞給 Flume-3,Flume-3 負責輸出到 LocalFileSystem。
需求分析
實現步驟
-
準備工作
在job內創建group1文件夾,用於存放配置文件
-
創建flume-file-flume.conf
配置一個接收日誌文件的source和兩個channel、兩個sink,分別輸送給hdfs和本地文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 將數據流複製給所有 channel # 默認選擇器就是複製,可以不需要 a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/hive-3.1.3/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink 端的 avro 是一個數據發送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node1 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
創建 flume-flume-hdfs.conf
配置上級 Flume 輸出的 Source,輸出是到 HDFS 的 Sink
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source 端的 avro 是一個數據接收服務 a2.sources.r1.type = avro a2.sources.r1.bind = node1 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://node1:8020/flume2/%Y%m%d/%H #上傳文件的前綴 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照時間滾動文件夾 a2.sinks.k1.hdfs.round = true #多少時間單位創建一個新的文件夾 a2.sinks.k1.hdfs.roundValue = 1 #重新定義時間單位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地時間戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #積攢多少個 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize = 100 #設置文件類型,可支援壓縮 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一個新的文件 a2.sinks.k1.hdfs.rollInterval = 30 #設置每個文件的滾動大小大概是 128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滾動與 Event 數量無關 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
-
創建 flume-flume-dir.conf
配置上級 Flume 輸出的 Source,輸出是到本地目錄的 Sink
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = node1 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/data/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。
-
執行配置文件
要先啟動後兩個配置文件,因為後兩個是第一個配置文件的sink的服務端
bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
-
啟動Hadoop和hive
4.2 負載均衡和故障轉移
案例需求
- 使用 Flume1 監控一個埠,其 sink 組中的 sink 分別對接 Flume2 和 Flume3,采FailoverSinkProcessor,實現故障轉移的功能。
需求分析
實現步驟
-
job 目錄下創建 group2 文件夾,用於存放配置文件
-
創建 flume-netcat-flume.conf
配置 1 個 netcat source 和 1 個 channel、1 個 sink group(2 個 sink),分別輸送給flume-flume-console1 和 flume-flume-console2
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 故障轉移 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # 負載均衡(開啟這段就要注釋故障轉移) # a1.sinkgroups.g1.processor.type = failover # sink拉取數據失敗後,下次退避一段時間,默認最大退避時間30s # a1.sinkgroups.g1.processor.backoff = true # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node1 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
-
創建 flume-flume-console1.conf
配置上級 Flume 輸出的 Source,輸出是到本地控制台
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = node1 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
-
創建 flume-flume-console2.conf
配置上級 Flume 輸出的 Source,輸出是到本地控制台
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = node1 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
-
執行配置文件
分別開啟對應配置文件: flume-flume-console2, flume-flume-console1, flume-netcat-flume
bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
-
使用 netcat 工具向本機的 44444 埠發送內容
nc localhost 44444
-
查看 Flume2 及 Flume3 的控制台列印日誌
-
將 Flume2 kill,觀察 Flume3 的控制台列印情況
4.3 聚合
案例需求
-
node1 上的 Flume-1 監控文件/opt/module/group.log
-
node2 上的 Flume-2 監控某一個埠的數據流
-
Flume-1 與 Flume-2 將數據發送給 node3 上的 Flume-3,Flume-3 將最終數據列印到控制台
需求分析
實現步驟
-
準備工作
分發 Flume到node2,node3在 node1、node2 以及 node3 的/XXX/job 目錄下創建一個group3 文件夾
-
創建 flume1-logger-flume.conf
配置 Source 用於監控 group.log 文件,配置 Sink 輸出數據到下一級 Flume
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/data/group.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node3 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
創建 flume2-netcat-flume.conf
配置 Source 監控埠 44444 數據流,配置 Sink 數據到下一級 Flume# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = node2 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = node3 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
-
創建 flume3-flume-logger.conf
配置 source 用於接收 flume1 與 flume2 發送過來的數據流,最終合併後 sink 到控制台# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = node3 a3.sources.r1.port = 4141 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
-
執行配置文件
分別開啟對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.confbin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
-
在 node1 上向/opt/dara 目錄下的 group.log 追加內容
echo 'hello' > group.log
-
在node2 上向 44444 埠發送數據
nc node2 44444
5. 自定義組件
5.1 自定義Interceptor
案例需求
- 使用 Flume 採集伺服器本地日誌,需要按照日誌類型的不同,將不同種類的日誌發往不同的分析系統。
需求分析
- 在實際的開發中,一台伺服器產生的日誌類型可能有很多種,不同類型的日誌可能需要發送到不同的分析系統。此時會用到 Flume 拓撲結構中的 Multiplexing 結構, Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予不同的值。
實現步驟
-
準備工作
創建一個 maven 項目,並引入以下依賴。
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
定義 CustomInterceptor 類並實現 Interceptor 介面
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; public class multiLoads implements Interceptor { //聲明一個存放事件的集合 private List<Event> addHeaderEvents; @Override public void initialize() { //初始化存放事件的集合 addHeaderEvents = new ArrayList<>(); } //單個事件攔截 @Override public Event intercept(Event event) { //1.獲取事件中的頭資訊 Map<String, String> headers = event.getHeaders(); //2.獲取事件中的 body 資訊 String body = new String(event.getBody()); //3.根據 body 中是否有"atguigu"來決定添加怎樣的頭資訊 if (body.contains("atguigu")) { //4.添加頭資訊 headers.put("type", "atguigu"); } else { //4.添加頭資訊 headers.put("type", "other"); } return event; } //批量事件攔截 @Override public List<Event> intercept(List<Event> events) { //1.清空集合 addHeaderEvents.clear(); //2.遍歷 events for (Event event : events) { //3.給每一個事件添加頭資訊 addHeaderEvents.add(intercept(event)); } //4.返回結果 return addHeaderEvents; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new multiLoads(); } @Override public void configure(Context context) { } } }
-
編輯配置文件
flume1.conf 放到 node1的group4文件中
配置 1 個 netcat source,1 個 sink group( 2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.flume.multiLoads$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.atguigu = c1 a1.sources.r1.selector.mapping.other = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node2 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = node3 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume2.conf 放到node2的group4文件夾中
配置一個 avro source 和一個 logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = node2 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
flume3.conf 放到node3的group4文件夾中
配置一個 avro source 和一個 logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = node3 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
-
分別在 node1,node2,node3上啟動 flume 進程,注意先後順序,先node3/node2 再node1
bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume3.conf -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume2.conf -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume1.conf
-
在 node1 使用 netcat 向 localhost:44444 發送字母和數字
nc localhost 44444
5.2 自定義source
介紹
-
Source 是負責接收數據到 Flume Agent 的組件。Source 組件可以處理各種類型、各種格式的日誌數據,包括 avro、thrift、exec、jms、spooling directory、netcat、sequencegenerator、syslog、http、legacy。官方提供的 source 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source。
-
官方也提供了自定義 source 的介面://flume.apache.org/FlumeDeveloperGuide.html#source 根據官方說明自定義MySource 需要繼承 AbstractSource 類並實現 Configurable 和 PollableSource 介面。
-
實現相應方法:
-
getBackOffSleepIncrement() //backoff 步長
-
getMaxBackOffSleepInterval() //backoff 最長時間
-
configure(Context context)//初始化 context(讀取配置文件內容)
-
process()//獲取數據封裝成 event 並寫入 channel,這個方法將被循環調用。
-
-
使用場景:讀取 MySQL 數據或者其他文件系統
需求
使用 flume 接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從 flume 配置文件中配置。
自定義Source需求
編碼
-
構建maven項目,導入依賴
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
-
編寫程式碼
import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //定義配置文件將來要讀取的欄位 private Long delay; private String field; //初始化配置資訊 @Override public void configure(Context context) { delay = context.getLong("delay"); // 默認值是hello field = context.getString("field", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { //創建事件頭資訊 HashMap<String, String> hearderMap = new HashMap<>(); //創建事件 SimpleEvent event = new SimpleEvent(); //循環封裝事件 for (int i = 0; i < 5; i++) { //給事件設置頭資訊 event.setHeaders(hearderMap); //給事件設置內容 event.setBody((field + i).getBytes()); //將事件寫入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
-
配置文件
打包——將寫好的程式碼打包,並放到 flume 的 lib 目錄下
創建配置文件mysource.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # 改寫全限定名 a1.sources.r1.type = XXX.XXX.MySource a1.sources.r1.delay = 1000 # 自定義後綴是什麼 # a1.sources.r1.field = world # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
開啟任務
bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
5.3 自定義sink
介紹
-
Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個 Flume Agent。
-
Sink 是完全事務性的。在從 Channel 批量刪除數據之前,每個 Sink 用 Channel 啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務。事務一旦被提交,該 Channel 從自己的內部緩衝區刪除事件。
-
Sink 組件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的 Sink 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 Sink。
-
官方也提供了自定義 sink 的介面://flume.apache.org/FlumeDeveloperGuide.html#sink 根 據 官 方 說 明自定義MySink 需要繼承 AbstractSink 類並實現 Configurable 介面。
-
實現相應方法:
- configure(Context context) //初始化 context(讀取配置文件內容)
- process() //從 Channel 讀取獲取數據(event),這個方法將被循環調用。
-
使用場景:讀取 Channel 數據寫入 MySQL 或者其他文件系統。
需求
使用 flume 接收數據,並在 Sink 端給每條數據添加前綴和後綴,輸出到控制台。前後綴可在 flume 任務配置文件中配置
編寫
-
編碼
import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { //創建 Logger 對象 private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { //聲明返回值狀態資訊 Status status; //獲取當前 Sink 綁定的 Channel Channel ch = getChannel(); //獲取事務 Transaction txn = ch.getTransaction(); //聲明事件 Event event; //開啟事務 txn.begin(); //讀取 Channel 中的事件,直到讀取到事件結束循環 while (true) { event = ch.take(); if (event != null) { break; } } try { //處理事件(列印) LOG.info(prefix + new String(event.getBody()) + suffix); //事務提交 txn.commit(); status = Status.READY; } catch (Exception e) { //遇到異常,事務回滾 txn.rollback(); status = Status.BACKOFF; } finally { //關閉事務 txn.close(); } return status; } @Override public void configure(Context context) { //讀取配置文件內容,有默認值 prefix = context.getString("prefix", "hello:"); //讀取配置文件內容,無默認值 suffix = context.getString("suffix"); } }
-
打包和配置文件
將寫好的程式碼打包,並放到 flume 的 lib 目錄
創建配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = XXX.XXX.MySink a1.sinks.k1.prefix = atguigu: a1.sinks.k1.suffix = :atguigu # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
開啟任務
bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
-
測試
nc localhost 44444
-
關閉進程,ctrl+c進程無法退出
# 先獲取進程的pid jps -l # 15445 org.apache.flume.node.Application # 殺死進程 kill -9 15445
6. 事務源碼
**Transaction **
public interface Transaction {
enum TransactionState { Started, Committed, RolledBack, Closed }
/**
* <p>Starts a transaction boundary for the current channel operation. If a
* transaction is already in progress, this method will join that transaction
* using reference counting.</p>
* <p><strong>Note</strong>: For every invocation of this method there must
* be a corresponding invocation of {@linkplain #close()} method. Failure
* to ensure this can lead to dangling transactions and unpredictable results.
* </p>
*/
void begin();
/**
* Indicates that the transaction can be successfully committed. It is
* required that a transaction be in progress when this method is invoked.
*/
void commit();
/**
* Indicates that the transaction can must be aborted. It is
* required that a transaction be in progress when this method is invoked.
*/
void rollback();
/**
* <p>Ends a transaction boundary for the current channel operation. If a
* transaction is already in progress, this method will join that transaction
* using reference counting. The transaction is completed only if there
* are no more references left for this transaction.</p>
* <p><strong>Note</strong>: For every invocation of this method there must
* be a corresponding invocation of {@linkplain #begin()} method. Failure
* to ensure this can lead to dangling transactions and unpredictable results.
* </p>
*/
void close();
}
rollback方法的實踐方法
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
// take操作有回滾不會丟失
// 寫回原隊列
queue.addFirst(takeList.removeLast());
}
// put操作直接清除了,沒有回滾,可能會導致丟失
// 但是TAILDIR模式下的put,不會丟失,因為只有成功doCommit才會使得文件的記錄資訊改變
// netcat會直接丟失
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
Commit方法的實踐方法
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) {
if (puts > 0) {
while (!putList.isEmpty()) {
if (!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
7. Flume優化
7.1 記憶體調整
調整Flume進程的記憶體大小,建議設置1G-2G,太小的話會導致頻繁GC
jstat -gcutil 15445 1000
# 顯示15445java進程,1s的記憶體變化
S0區 S1區 Eden區 Old區 元空間 YoungGC執行次數 執行時間 FullGC次數 時間 總的GC時間
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
0.00 0.00 0.00 65.35 95.16 89.45 309440 205.624 3 0.040 205.664
6.25 0.00 0.00 65.35 95.16 89.45 309556 205.690 3 0.040 205.729
6.25 0.00 0.00 65.68 95.16 89.45 309674 205.749 3 0.040 205.788
0.00 6.25 0.00 65.68 95.16 89.45 309790 205.816 3 0.040 205.856
6.25 0.00 0.00 65.76 95.16 89.45 309907 205.881 3 0.040 205.921
0.00 6.25 0.00 66.01 95.16 89.45 310023 205.944 3 0.040 205.984
0.00 6.25 0.00 66.01 95.16 89.45 310139 206.008 3 0.040 206.048
6.25 0.00 0.00 66.01 95.16 89.45 310254 206.074 3 0.040 206.114
0.00 6.25 0.00 66.34 95.16 89.45 310371 206.134 3 0.040 206.174
0.00 6.25 0.00 66.34 95.16 89.45 310488 206.192 3 0.040 206.232
如果YGC執行次數增加的很快,可以適當增加運行記憶體
修改conf文件夾內的flume-env.sh文件
# Xms是起始記憶體 Xmx是最大記憶體
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
7.2 日誌配置
啟動多個Flume進程是,建議修改配置區分日誌文件
修改配置多個conf文件夾,並修改的log4.properties文件
運行flume進程時,指定某一個conf文件
# 修改日誌記錄的級別
flume.root.logger=INFO,LOGFILE
# 修改日誌放置的文件,不用改
flume.log.dir=./logs
# 修改日誌文件的名稱
flume.log.file=flume.log
7.3 Flume進程監控
Flume是一個單進程程式,會存在單點故障,所以需要有一個監控機制,發現Flume進行Down掉之後,需要重啟
- 通過Shell腳本實現Flume進程監控以及自動重啟
配置文件
molist.conf
# 這裡的example名稱要保證能用ps -ef | grep example定位到進程
example=startExample.sh
啟動腳本
startExample.sh
#!/bin/bash
flume_path=/XXX/XXX
nohup ${flume_path}/bin/flume-ng agent -c ${flume_path}/conf -n a1 -f ${flume_path}/XXX/XXX.conf &
監控腳本
monList.sh
#!/bin/bash
monlist=`cat molist.conf`
echo "===start check==="
for item in ${monlist}
do
# 設置欄位分隔符
OLD_IFS=$IFS
IFS="="
# 把一行內容轉成多列[數組]
arr=($item)
# 獲取等號左邊的內容
name=${arr[0]}
# 獲取等號右邊的內容
script=${arr[1]}
echo "time is:"`date +"%Y-%m-%d %H:%M:%S"`"check"$name
if [ `jps -m|grep $name | wc -l` -eq 0 ]
then
echo `date +"%Y-%m-%d %H:%M:%S"`$name "is none"
sh -x ./${script}
fi
done
可以設置crontab定時調度monlist.sh腳本