快速學習-Flume企業開發案例

  • 2020 年 2 月 18 日
  • 筆記

第3章 企業開發案例

3.1 監控端口數據官方案例

  1. 案例需求:首先,Flume監控本機44444端口,然後通過telnet工具向本機44444端口發送消息,最後Flume將監聽的數據實時顯示在控制台。
  2. 需求分析:
  1. 實現步驟: 1.安裝telnet工具 將rpm軟件包(xinetd-2.3.14-40.el6.x86_64.rpm、telnet-0.17-48.el6.x86_64.rpm和telnet-server-0.17-48.el6.x86_64.rpm)拷入/opt/software文件夾下面。執行RPM軟件包安裝命令:
[atguigu@hadoop102 software]$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm  [atguigu@hadoop102 software]$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm  [atguigu@hadoop102 software]$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm
  1. 判斷44444端口是否被佔用 [atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444 功能描述:netstat命令是一個監控TCP/IP網絡的非常有用的工具,它可以顯示路由表、實際的網絡連接以及每一個網絡接口設備的狀態信息。
基本語法:netstat [選項]  選項參數:  	-t或--tcp:顯示TCP傳輸協議的連線狀況;  -u或--udp:顯示UDP傳輸協議的連線狀況;  	-n或--numeric:直接使用ip地址,而不通過域名服務器;  	-l或--listening:顯示監控中的服務器的Socket;  	-p或--programs:顯示正在使用Socket的程序識別碼和程序名稱;
  1. 創建Flume Agent配置文件flume-telnet-logger.conf 在flume目錄下創建job文件夾並進入job文件夾。
[atguigu@hadoop102 flume]$ mkdir job  [atguigu@hadoop102 flume]$ cd job/

在job文件夾下創建Flume Agent配置文件flume-telnet-logger.conf。 [atguigu@hadoop102 job]$ touch flume-telnet-logger.conf

在flume-telnet-logger.conf文件中添加如下內容。 [atguigu@hadoop102 job]$ vim flume-telnet-logger.conf 添加內容如下:

# Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1    # Describe/configure the source  a1.sources.r1.type = netcat  a1.sources.r1.bind = localhost  a1.sources.r1.port = 44444    # Describe the sink  a1.sinks.k1.type = logger    # Use a channel which buffers events in memory  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1

註:配置文件來源於官方手冊http://flume.apache.org/FlumeUserGuide.html

  1. 先開啟flume監聽端口
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console

參數說明: –conf conf/ :表示配置文件存儲在conf/目錄 –name a1 :表示給agent起名為a1 –conf-file job/flume-telnet.conf :flume本次啟動讀取的配置文件是在job文件夾下的flume-telnet.conf文件。 -Dflume.root.logger==INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制台日誌打印級別設置為INFO級別。日誌級別包括:log、info、warn、error。

  1. 使用telnet工具向本機的44444端口發送內容
[atguigu@hadoop102 ~]$ telnet localhost 44444
  1. 在Flume監聽頁面觀察接收數據情況

3.2 實時讀取本地文件到HDFS案例

  1. 案例需求:實時監控Hive日誌,並上傳到HDFS中
  2. 需求分析:
  1. 實現步驟:
    1. Flume要想將數據輸出到HDFS,必須持有Hadoop相關jar包 將commons-configuration-1.6.jar、 hadoop-auth-2.7.2.jar、 hadoop-common-2.7.2.jar、 hadoop-hdfs-2.7.2.jar、 commons-io-2.4.jar、 htrace-core-3.1.0-incubating.jar 拷貝到/opt/module/flume/lib文件夾下。
    2. 創建flume-file-hdfs.conf文件 創建文件 [atguigu@hadoop102 job]$ touch flume-file-hdfs.conf 註:要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令。由於Hive日誌在Linux系統中所以讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。

[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf 添加如下內容

# Name the components on this agent  a2.sources = r2  a2.sinks = k2  a2.channels = c2    # Describe/configure the source  a2.sources.r2.type = exec  a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log  a2.sources.r2.shell = /bin/bash -c    # Describe the sink  a2.sinks.k2.type = hdfs  a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H  #上傳文件的前綴  a2.sinks.k2.hdfs.filePrefix = logs-  #是否按照時間滾動文件夾  a2.sinks.k2.hdfs.round = true  #多少時間單位創建一個新的文件夾  a2.sinks.k2.hdfs.roundValue = 1  #重新定義時間單位  a2.sinks.k2.hdfs.roundUnit = hour  #是否使用本地時間戳  a2.sinks.k2.hdfs.useLocalTimeStamp = true  #積攢多少個Event才flush到HDFS一次  a2.sinks.k2.hdfs.batchSize = 1000  #設置文件類型,可支持壓縮  a2.sinks.k2.hdfs.fileType = DataStream  #多久生成一個新的文件  a2.sinks.k2.hdfs.rollInterval = 600  #設置每個文件的滾動大小  a2.sinks.k2.hdfs.rollSize = 134217700  #文件的滾動與Event數量無關  a2.sinks.k2.hdfs.rollCount = 0  #最小冗餘數  a2.sinks.k2.hdfs.minBlockReplicas = 1    # Use a channel which buffers events in memory  a2.channels.c2.type = memory  a2.channels.c2.capacity = 1000  a2.channels.c2.transactionCapacity = 100    # Bind the source and sink to the channel  a2.sources.r2.channels = c2  a2.sinks.k2.channel = c2
  1. 執行監控配置 [atguigu@hadoop102 flume]$ bin/flume-ng agent –conf conf/ –name a2 –conf-file job/flume-file-hdfs.conf
  2. 開啟Hadoop和Hive並操作Hive產生日誌
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh  [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh    [atguigu@hadoop102 hive]$ bin/hive  hive (default)>
  1. 在HDFS上查看文件。

3.3 實時讀取目錄文件到HDFS案例

  1. 案例需求:使用Flume監聽整個目錄的文件
  2. 需求分析:
  1. 實現步驟:
    1. 創建配置文件flume-dir-hdfs.conf
創建一個文件  [atguigu@hadoop102 job]$ touch flume-dir-hdfs.conf  打開文件  [atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf  添加如下內容    a3.sources = r3  a3.sinks = k3  a3.channels = c3    # Describe/configure the source  a3.sources.r3.type = spooldir  a3.sources.r3.spoolDir = /opt/module/flume/upload  a3.sources.r3.fileSuffix = .COMPLETED  a3.sources.r3.fileHeader = true  #忽略所有以.tmp結尾的文件,不上傳  a3.sources.r3.ignorePattern = ([^ ]*.tmp)    # Describe the sink  a3.sinks.k3.type = hdfs  a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H  #上傳文件的前綴  a3.sinks.k3.hdfs.filePrefix = upload-  #是否按照時間滾動文件夾  a3.sinks.k3.hdfs.round = true  #多少時間單位創建一個新的文件夾  a3.sinks.k3.hdfs.roundValue = 1  #重新定義時間單位  a3.sinks.k3.hdfs.roundUnit = hour  #是否使用本地時間戳  a3.sinks.k3.hdfs.useLocalTimeStamp = true  #積攢多少個Event才flush到HDFS一次  a3.sinks.k3.hdfs.batchSize = 100  #設置文件類型,可支持壓縮  a3.sinks.k3.hdfs.fileType = DataStream  #多久生成一個新的文件  a3.sinks.k3.hdfs.rollInterval = 600  #設置每個文件的滾動大小大概是128M  a3.sinks.k3.hdfs.rollSize = 134217700  #文件的滾動與Event數量無關  a3.sinks.k3.hdfs.rollCount = 0  #最小冗餘數  a3.sinks.k3.hdfs.minBlockReplicas = 1    # Use a channel which buffers events in memory  a3.channels.c3.type = memory  a3.channels.c3.capacity = 1000  a3.channels.c3.transactionCapacity = 100    # Bind the source and sink to the channel  a3.sources.r3.channels = c3  a3.sinks.k3.channel = c3
  1. 啟動監控文件夾命令 [atguigu@hadoop102 flume]$ bin/flume-ng agent –conf conf/ –name a3 –conf-file job/flume-dir-hdfs.conf 說明: 在使用Spooling Directory Source時
    1. 不要在監控目錄中創建並持續修改文件
    2. 上傳完成的文件會以.COMPLETED結尾
    3. 被監控文件夾每500毫秒掃描一次文件變動
  2. 向upload文件夾中添加文件 在/opt/module/flume目錄下創建upload文件夾 [atguigu@hadoop102 flume]$ mkdir upload 向upload文件夾中添加文件
[atguigu@hadoop102 upload]$ touch atguigu.txt  [atguigu@hadoop102 upload]$ touch atguigu.tmp  [atguigu@hadoop102 upload]$ touch atguigu.log
  1. 查看HDFS上的數據
  1. 等待1s,再次查詢upload文件夾
[atguigu@hadoop102 upload]$ ll  總用量 0  -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.log.COMPLETED  -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.tmp  -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.txt.COMPLETED

3.4 單數據源多出口案例(選擇器)

單Source多Channel、Sink如圖7-2所示。

  1. 案例需求:使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。
  2. 需求分析:
  1. 實現步驟:
    1. 準備工作 在/opt/module/flume/job目錄下創建group1文件夾 [atguigu@hadoop102 job]$ cd group1/ 在/opt/module/datas/目錄下創建flume3文件夾 [atguigu@hadoop102 datas]$ mkdir flume3
    2. 創建flume-file-flume.conf 配置1個接收日誌文件的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。 創建配置文件並打開
[atguigu@hadoop102 group1]$ touch flume-file-flume.conf  [atguigu@hadoop102 group1]$ vim flume-file-flume.conf

添加如下內容

# Name the components on this agent  a1.sources = r1  a1.sinks = k1 k2  a1.channels = c1 c2  # 將數據流複製給所有channel  a1.sources.r1.selector.type = replicating    # Describe/configure the source  a1.sources.r1.type = exec  a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log  a1.sources.r1.shell = /bin/bash -c    # Describe the sink  a1.sinks.k1.type = avro  a1.sinks.k1.hostname = hadoop102  a1.sinks.k1.port = 4141    a1.sinks.k2.type = avro  a1.sinks.k2.hostname = hadoop102  a1.sinks.k2.port = 4142    # Describe the channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    a1.channels.c2.type = memory  a1.channels.c2.capacity = 1000  a1.channels.c2.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1 c2  a1.sinks.k1.channel = c1  a1.sinks.k2.channel = c2

註:Avro是由Hadoop創始人Doug Cutting創建的一種語言無關的數據序列化和RPC框架。 註:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

  1. 創建flume-flume-hdfs.conf
配置上級Flume輸出的Source,輸出是到HDFS的Sink。  創建配置文件並打開  [atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf  [atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf  添加如下內容  # Name the components on this agent  a2.sources = r1  a2.sinks = k1  a2.channels = c1    # Describe/configure the source  a2.sources.r1.type = avro  a2.sources.r1.bind = hadoop102  a2.sources.r1.port = 4141    # Describe the sink  a2.sinks.k1.type = hdfs  a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H  #上傳文件的前綴  a2.sinks.k1.hdfs.filePrefix = flume2-  #是否按照時間滾動文件夾  a2.sinks.k1.hdfs.round = true  #多少時間單位創建一個新的文件夾  a2.sinks.k1.hdfs.roundValue = 1  #重新定義時間單位  a2.sinks.k1.hdfs.roundUnit = hour  #是否使用本地時間戳  a2.sinks.k1.hdfs.useLocalTimeStamp = true  #積攢多少個Event才flush到HDFS一次  a2.sinks.k1.hdfs.batchSize = 100  #設置文件類型,可支持壓縮  a2.sinks.k1.hdfs.fileType = DataStream  #多久生成一個新的文件  a2.sinks.k1.hdfs.rollInterval = 600  #設置每個文件的滾動大小大概是128M  a2.sinks.k1.hdfs.rollSize = 134217700  #文件的滾動與Event數量無關  a2.sinks.k1.hdfs.rollCount = 0  #最小冗餘數  a2.sinks.k1.hdfs.minBlockReplicas = 1    # Describe the channel  a2.channels.c1.type = memory  a2.channels.c1.capacity = 1000  a2.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a2.sources.r1.channels = c1  a2.sinks.k1.channel = c1
  1. 創建flume-flume-dir.conf
配置上級Flume輸出的Source,輸出是到本地目錄的Sink。  創建配置文件並打開  [atguigu@hadoop102 group1]$ touch flume-flume-dir.conf  [atguigu@hadoop102 group1]$ vim flume-flume-dir.conf  添加如下內容  # Name the components on this agent  a3.sources = r1  a3.sinks = k1  a3.channels = c2    # Describe/configure the source  a3.sources.r1.type = avro  a3.sources.r1.bind = hadoop102  a3.sources.r1.port = 4142    # Describe the sink  a3.sinks.k1.type = file_roll  a3.sinks.k1.sink.directory = /opt/module/datas/flume3    # Describe the channel  a3.channels.c2.type = memory  a3.channels.c2.capacity = 1000  a3.channels.c2.transactionCapacity = 100    # Bind the source and sink to the channel  a3.sources.r1.channels = c2  a3.sinks.k1.channel = c2  提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。
  1. 執行配置文件
分別開啟對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。  [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
  1. 啟動Hadoop和Hive
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh  [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh    [atguigu@hadoop102 hive]$ bin/hive  hive (default)>
  1. 檢查HDFS上數據
  1. 檢查/opt/module/datas/flume3目錄中數據
[atguigu@hadoop102 flume3]$ ll  總用量 8  -rw-rw-r--. 1 atguigu atguigu 5942 5月  22 00:09 1526918887550-3

3.5 單數據源多出口案例(Sink組)

單Source、Channel多Sink(負載均衡)如圖7-3所示

  1. 案例需求:使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3也負責存儲到HDFS
  2. 需求分析:
  1. 實現步驟:
    1. 準備工作 在/opt/module/flume/job目錄下創建group2文件夾 [atguigu@hadoop102 job]$ cd group2/
    2. 創建flume-netcat-flume.conf
配置1個接收日誌文件的source和1個channel、兩個sink,分別輸送給flume-flume-console1和flume-flume-console2。  創建配置文件並打開  [atguigu@hadoop102 group2]$ touch flume-netcat-flume.conf  [atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf  添加如下內容  # Name the components on this agent  a1.sources = r1  a1.channels = c1  a1.sinkgroups = g1  a1.sinks = k1 k2    # Describe/configure the source  a1.sources.r1.type = netcat  a1.sources.r1.bind = localhost  a1.sources.r1.port = 44444    a1.sinkgroups.g1.processor.type = load_balance  a1.sinkgroups.g1.processor.backoff = true  a1.sinkgroups.g1.processor.selector = round_robin  a1.sinkgroups.g1.processor.selector.maxTimeOut=10000    # Describe the sink  a1.sinks.k1.type = avro  a1.sinks.k1.hostname = hadoop102  a1.sinks.k1.port = 4141    a1.sinks.k2.type = avro  a1.sinks.k2.hostname = hadoop102  a1.sinks.k2.port = 4142    # Describe the channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinkgroups.g1.sinks = k1 k2  a1.sinks.k1.channel = c1  a1.sinks.k2.channel = c1  註:Avro是由Hadoop創始人Doug Cutting創建的一種語言無關的數據序列化和RPC框架。  註:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。
  1. 創建flume-flume-console1.conf
配置上級Flume輸出的Source,輸出是到本地控制台。  創建配置文件並打開  [atguigu@hadoop102 group2]$ touch flume-flume-console1.conf  [atguigu@hadoop102 group2]$ vim flume-flume-console1.conf  添加如下內容  # Name the components on this agent  a2.sources = r1  a2.sinks = k1  a2.channels = c1    # Describe/configure the source  a2.sources.r1.type = avro  a2.sources.r1.bind = hadoop102  a2.sources.r1.port = 4141    # Describe the sink  a2.sinks.k1.type = logger    # Describe the channel  a2.channels.c1.type = memory  a2.channels.c1.capacity = 1000  a2.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a2.sources.r1.channels = c1  a2.sinks.k1.channel = c1
  1. 創建flume-flume-console2.conf
配置上級Flume輸出的Source,輸出是到本地控制台。  創建配置文件並打開  [atguigu@hadoop102 group2]$ touch flume-flume-console2.conf  [atguigu@hadoop102 group2]$ vim flume-flume-console2.conf  添加如下內容  # Name the components on this agent  a3.sources = r1  a3.sinks = k1  a3.channels = c2    # Describe/configure the source  a3.sources.r1.type = avro  a3.sources.r1.bind = hadoop102  a3.sources.r1.port = 4142    # Describe the sink  a3.sinks.k1.type = logger    # Describe the channel  a3.channels.c2.type = memory  a3.channels.c2.capacity = 1000  a3.channels.c2.transactionCapacity = 100    # Bind the source and sink to the channel  a3.sources.r1.channels = c2  a3.sinks.k1.channel = c2
  1. 執行配置文件
分別開啟對應配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。  [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
  1. 使用telnet工具向本機的44444端口發送內容
$ telnet localhost 44444
  1. 查看Flume2及Flume3的控制台打印日誌

3.6 多數據源匯總案例

多Source匯總數據到單Flume如圖7-4所示。

  1. 案例需求: hadoop103上的Flume-1監控文件/opt/module/group.log, hadoop102上的Flume-2監控某一個端口的數據流, Flume-1與Flume-2將數據發送給hadoop104上的Flume-3,Flume-3將最終數據打印到控制台。
  2. 需求分析:
  1. 實現步驟: 0.準備工作 分發Flume [atguigu@hadoop102 module]$ xsync flume 在hadoop102、hadoop103以及hadoop104的/opt/module/flume/job目錄下創建一個group3文件夾。 [atguigu@hadoop102 job]$ mkdir group3 [atguigu@hadoop103 job]$ mkdir group3 [atguigu@hadoop104 job]$ mkdir group3
  2. 創建flume1-logger-flume.conf
配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。  在hadoop103上創建配置文件並打開  [atguigu@hadoop103 group3]$ touch flume1-logger-flume.conf  [atguigu@hadoop103 group3]$ vim flume1-logger-flume.conf  添加如下內容  # Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1    # Describe/configure the source  a1.sources.r1.type = exec  a1.sources.r1.command = tail -F /opt/module/group.log  a1.sources.r1.shell = /bin/bash -c    # Describe the sink  a1.sinks.k1.type = avro  a1.sinks.k1.hostname = hadoop104  a1.sinks.k1.port = 4141    # Describe the channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1
  1. 創建flume2-netcat-flume.conf
配置Source監控端口44444數據流,配置Sink數據到下一級Flume:  在hadoop102上創建配置文件並打開  [atguigu@hadoop102 group3]$ touch flume2-netcat-flume.conf  [atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf  添加如下內容  # Name the components on this agent  a2.sources = r1  a2.sinks = k1  a2.channels = c1    # Describe/configure the source  a2.sources.r1.type = netcat  a2.sources.r1.bind = hadoop102  a2.sources.r1.port = 44444    # Describe the sink  a2.sinks.k1.type = avro  a2.sinks.k1.hostname = hadoop104  a2.sinks.k1.port = 4141    # Use a channel which buffers events in memory  a2.channels.c1.type = memory  a2.channels.c1.capacity = 1000  a2.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a2.sources.r1.channels = c1  a2.sinks.k1.channel = c1
  1. 創建flume3-flume-logger.conf
配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制台。  在hadoop104上創建配置文件並打開  [atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf  [atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf  添加如下內容  # Name the components on this agent  a3.sources = r1  a3.sinks = k1  a3.channels = c1    # Describe/configure the source  a3.sources.r1.type = avro  a3.sources.r1.bind = hadoop104  a3.sources.r1.port = 4141    # Describe the sink  # Describe the sink  a3.sinks.k1.type = logger    # Describe the channel  a3.channels.c1.type = memory  a3.channels.c1.capacity = 1000  a3.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a3.sources.r1.channels = c1  a3.sinks.k1.channel = c1
  1. 執行配置文件
分別開啟對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。  [atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf    [atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf
  1. 在hadoop103上向/opt/module目錄下的group.log追加內容
[atguigu@hadoop103 module]$ echo 'hello' > group.log
  1. 在hadoop102上向44444端口發送數據
[atguigu@hadoop102 flume]$ telnet hadoop102 44444
  1. 檢查hadoop104上數據