大数据-Flume采集文件到HDFS

  • 2019 年 12 月 26 日
  • 筆記

2.2. 采集案例

2.2.4. 采集文件到HDFS

需求 比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到 hdfs

分析 根据需求,首先定义以下3大要素

  • 采集源,即source——监控文件内容更新 : exec ‘tail -F file’
  • 下沉目标,即sink——HDFS文件系统 : hdfs sink
  • Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

Step 1: 定义 Flume 配置文件

cd /export/servers/apache-flume-1.8.0-bin/conf  vim tail-file.conf
agent1.sources = source1  agent1.sinks = sink1  agent1.channels = channel1  # Describe/configure tail -F source1  agent1.sources.source1.type = exec  agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log  agent1.sources.source1.channels = channel1  # Describe sink1  agent1.sinks.sink1.type = hdfs  #a1.sinks.k1.channel = c1  agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%  agent1.sinks.sink1.hdfs.filePrefix = access_log  agent1.sinks.sink1.hdfs.maxOpenFiles = 5000  agent1.sinks.sink1.hdfs.batchSize= 100  agent1.sinks.sink1.hdfs.fileType = DataStream  agent1.sinks.sink1.hdfs.writeFormat =Text  agent1.sinks.sink1.hdfs.round = true  agent1.sinks.sink1.hdfs.roundValue = 10  agent1.sinks.sink1.hdfs.roundUnit = minute  agent1.sinks.sink1.hdfs.useLocalTimeStamp = true  # Use a channel which buffers events in memory  agent1.channels.channel1.type = memory  agent1.channels.channel1.keep-alive = 120  agent1.channels.channel1.capacity = 500000  agent1.channels.channel1.transactionCapacity = 600  # Bind the source and sink to the channel  agent1.sources.source1.channels = channel1  agent1.sinks.sink1.channel = channel1

Step 2: 启动 Flume

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1

Step 3: 开发 Shell 脚本定时追加文件内容

mkdir -p /export/servers/shells/  cd /export/servers/shells/  vim tail-file.sh
#!/bin/bash  while true dodate >> /export/servers/taillogs/access_log; sleep 0.5; done

Step 4: 启动脚本

# 创建文件夹  mkdir -p /export/servers/taillogs  # 启动脚本  sh /export/servers/shells/tail-file.sh