[Streaming] Flume+Kafka+Storm,配置及简单示例1 流处理输出Print

这里给出一个flume+kafka+storm输出的例子
环境配置:HMaster、HData01、HData02
zookeeper_server = 192.168.111.140:2181,192.168.111.141:2181,192.168.111.142:2181
kafka_broker_list = 192.168.111.140:9092,192.168.111.141:9092,192.168.111.142:9092

数据流程的配置:
数据源:/home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log
flume agent文件:/home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2Kafka.conf
读取方式:tail -F
Kafka topic: topic-one

 

0.造数。先给出一个测试数据生成器,每10s向fileName.log输入一段字符串。脚本flume_datamaker.sh参考如下

#!/usr/bin
MAX_NUM=10000
if [ ! -n "$1" ];then
echo "MAX_NUM default is "${MAX_NUM}
else
MAX_NUM=${1}
echo "MAX_NUM is set to "${1}
fi
for((i=0;i<${MAX_NUM};i++)) do echo "@@@@@@@@@@@@@@@@@@HELLO_@@@@@@@@@@@@@@@@@@"${i} >> /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log
sleep 10s
done

然后启动此脚本放至后台即可: sh flume_datamaker.sh &

 

1.配置Kafka
Step1.1 创建topic,名称为topic-one

$ ./kafka-topics.sh --create --zookeeper 192.168.111.140:2181,192.168.111.141:2181,192.168.111.142:2181 --replication-factor 2 --partitions 1 --topic topic-one

 

Step1.2 测试topic.创建一个producer和consumer来测试,如下:

$ cd /home/hadoop/BigData/kafka_2.11/bin
$ ./kafka-console-producer.sh --broker-list 192.168.111.140:9092 --topic topic-one
abc
sdf

新开一个窗口
$ ./kafka-console-consumer.sh --zookeeper 192.168.111.140:2181,192.168.111.141:2181,192.168.111.142:2181 --topic topic-one --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-
server] instead of [zookeeper].
abc
sdf

 

2.配置flume,
Step2.1 agent.conf的配置/home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2Kafka.conf为:

a2.sources=r1
a2.channels=c1
a2.sinks=k1

#sources Config
a2.sources.r1.type=exec
a2.sources.r1.command=tail -F /home/hadoop/BigData/flume-1.7.0-bin/test/fileName.log

#channels Config
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000000
a2.channels.c1.transactionCapacity = 1000

#sinks Config
a2.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k1.kafka.topic=topic-one
a2.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9092
a2.sinks.k1.flumeBatchSize=1000000
a2.sinks.k1.kafka.producer.type=sync
a2.sinks.k1.requiredAcks=0

#Bindthesourceandsinktothechannel
a2.sources.r1.channels=c1
a2.sinks.k1.channel=c1

Step2.2启动flume
$ /home/hadoop/BigData/flume-1.7.0-bin/bin/flume-ng agent -n a2 -c conf -f /home/hadoop/BigData/flume-1.7.0-bin/test_agent/agent_tailF2Kafka.conf &

 

Step2.3测试
启动后,即可在topic-one的consumer中看到flume_datamaker.sh产生的数据

 

3.配置Storm
Step3.1 参考Storm的topology WordCountKafkaTopology如下

package services.basic;

import java.util.Properties;
import java.util.UUID;

import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Tuple;
public class WordCountKafkaTopology {

private static final String KAFKA_SPOUT = "KAFKA_SPOUT";
/**
* kafka broker信息,格式为: ip:port,需要根据集群实际情况修改
*/
private static final String KAFKA_BROKER_LIST = "192.168.111.140:9092,192.168.111.141:9092,192.168.111.142:9092";
/**
* zookeeper 服务端信息,格式为: ip:port,需要根据集群实际情况修改
*/
private static final String KAFKA_ZKSERVER_LIST = "192.168.111.140:2181,192.168.111.141:2181,192.168.111.142:2181";

public static class PrintBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String rec = tuple.getString(0);
System.err.println("String recieved: " + rec);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// do nothing
}

}

public static void main(String[] args) throws Exception
{
//KafkaSpout配置信息
String inputTopicName = "topic-one";
String zkServers = KAFKA_ZKSERVER_LIST;//
String kafkaRoot = "/kafka";
String connectString = zkServers + kafkaRoot;
String zkRoot = kafkaRoot + "/" + inputTopicName;
String appId = UUID.randomUUID().toString();

String brokerlist = KAFKA_BROKER_LIST;
Properties props = new Properties();
props.put("metadata.broker.list", brokerlist);
props.put("producer.type", "async");
props.put("request.required.acks", "0");
props.put("serializer.class", "kafka.serializer.StringEncoder");

//设置拓扑配置
Config conf = new Config();
// conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
conf.setDebug(true);
conf.put("kafka.broker.properties", props);

//构造KafkaSpout对象
BrokerHosts hosts = new ZkHosts(connectString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

// 构造拓扑,kafkaspout ==> splitBolt ==> countBolt ==> KafkaBolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT, kafkaSpout, 1);
builder.setBolt("print", new PrintBolt(), 1).localOrShuffleGrouping(KAFKA_SPOUT);

//命令行提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}

}

 

Step3.2 打包storm并提交
这里我没用maven打包,自己搞了个打包工具,详细见
打包后,提交storm

$ storm jar stmStorm.jar services.basic.WordCountKafkaTopology kafka_spout

 

4.查看运行结果
打开storm ui(我的是192.168.111.140:9099)可以看到:


打开storm日志tail -f /home/hadoop/BigData/storm-1.0.2/logs/workers-artifacts/kafka_spout-4-1487905415/6703/work.log这里具体storm的id请以页面为准,如下:


说明,这个流处理的过程已经完成了

分类上一篇:     分类下一篇:

Leave a Reply