[Flume] Flume常见用法示例配置

环境搭建见链接:    [Flume] 大数据开源环境搭建(集群):30.Flume

本文配置:Redhat6.5、JDK-jdk1.7.0_79 、flume-1.7.0

提纲:

♦示例1: tailF读取fileName.log并输出到指定目录
♦示例2: tailF读取fileName.log并输出到HDFS
♦示例3: tailF读取fileName.log并输出到Kafka

详细如下:

示例1: tailF读取fileName.log并输出到指定目录

Step1 创建agent文件,比如 vi /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2File.conf 内容如下;

a1.sources = tailF2File
a1.channels = file_channel
a1.sinks = k1

#sources Config
a1.sources.tailF2File.type = exec
a1.sources.tailF2File.command = tail -F /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

#channels Config
a1.channels.file_channel.type = memory
a1.channels.file_channel.capacity = 1000000
a1.channels.file_channel.transactionCapacity = 1000

#sinks Config
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/BigData/flume-1.7.0-bin/test/Sink01/
a1.sinks.k1.rollInterval = 0

#bind Config
a1.sources.tailF2File.channels = file_channel
a1.sinks.k1.channel = file_channel

 

Step2 启动flume

$ /home/hadoop/BigData/flume-1.7.0-bin/bin/flume-ng agent -n a1 -c conf -f /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2File.conf

 

Step3 测试 创建fileName.log并不断输入信息

$ echo "ABC" >> /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log
$ echo "CDF" >> /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

此时,查看/home/hadoop/BigData/flume-1.7.0-bin/test/Sink01/即可发现相应的输出文件

 

示例2: tailF读取fileName.log并输出到HDFS

Step1 创建agent文件,比如 vi /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2HDFS.conf 内容如下;

a3.sources=r1
a3.channels=c1
a3.sinks=k1

#sources Config
a3.sources.r1.type=exec
a3.sources.r1.command=tail -F /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

#channels Config
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000000
a3.channels.c1.transactionCapacity = 1000

#sinks Config
a3.sinks.k1.type=hdfs
a3.sinks.k1.hdfs.path=hdfs://HMaster:9000/test_FLume
a3.sinks.k1.hdfs.fileType=DataStream
a3.sinks.k1.hdfs.writeFormat=TEXT
a3.sinks.k1.hdfs.rollInterval=4

#bind Config
a3.sources.r1.channels=c1
a3.sinks.k1.channel=c1

 

示例3: tailF读取fileName.log并输出到Kafka

Step1 创建agent文件,比如 vi /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2Kafka.conf 内容如下;

a2.sources=r1
a2.channels=c1
a2.sinks=k1

#sources Config
a2.sources.r1.type=exec
a2.sources.r1.command=tail -F /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

#channels Config
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000000
a2.channels.c1.transactionCapacity = 1000

#sinks Config
a2.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k1.kafka.topic=topic-one
a2.sinks.k1.kafka.bootstrap.servers=192.168.111.140:9092,192.168.111.141:9092,192.168.111.142:9092
a2.sinks.k1.flumeBatchSize=1000000
a2.sinks.k1.kafka.producer.type=sync
a2.sinks.k1.requiredAcks=0

#Bindthesourceandsinktothechannel
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

说明,其中的

type连接kafka驱动

topic为你创建的kafka的topic

bootstrap.servers为Kafka的listener port(即配置文件servers.properties中的 listeners=PLAINTEXT://192.168.111.140:9092)

其他不解释了

 

Step2 启动flume

$ /home/hadoop/BigData/flume-1.7.0-bin/bin/flume-ng agent -n a2 -c conf -f /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2Kafka.conf

 

Step3 测试 创建fileName.log并不断输入信息

$ echo "EBD" >> /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log
$ echo "ASDE" >> /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

此时,打开Kafka的consumer即可看到对应的数据输出,打开consumer的命令参考如下:

./kafka-console-consumer.sh --zookeeper 192.168.111.140:2181,192.168.111.141:2181,192.168.111.142:2181 --topic topic-one --from-beginning

 

 

 

 

分类上一篇:     分类下一篇:

Leave a Reply