具体实现
1、实现Spout, 因为Storm源数据是从kafka中获取, 所以使用storm提供的KafkaSpout
//由于Spout是从kafka中获取数据, Storm提供了KafkaSpout
//配置kafkaSpout
//kafka的topic, 是为了方便从哪儿拿数据
String topic = "testFlume";
ZkHosts zkHosts = new ZkHosts("192.168.10.224:3384,192.168.10.225:3384,192.168.10.226:3384");
//
SpoutConfig spoutConf = new SpoutConfig(zkHosts,
topic, //kafka topic
"/test", //偏移量的根目录
"test"); //唯一id
//通过SpoutConfig设置其他参数
spoutConf.forceFromStart = false; //从头开始消费
spoutConf.socketTimeoutMs = 60*1000;//连接超时
//设置scheme, 控制从kafak中获取的数据输出为String
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
//KafkaSpout中只接收一个参数
KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
为topology提供Spout
//创建TopologyBilder
TopologyBuilder builder = new TopologyBuilder();
//为topology设置Spout
builder.setSpout("KafkaSpout", kafkaSpout, 3);
2、设置第一个Bolt
为了过滤Spout中发送过来的日志中的ERROR信息 创建一个Bolt提供此功能:
package com.chb.flume_kafka_storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 第一个Bolt为了过滤Spout发送过来的数据中的ERROR信息
*/
public class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple arg0, BasicOutputCollector arg1) {
String line = arg0.getString(0);
if (line.contains("ERROR")) {
System.err.println(line);
arg1.emit(new Values(line));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// 这个地方写error是给后面FieldNameBasedTupleToKafkaMapper来用
arg0.declare(new Fields("error"));
}
}
为topology设置Bolt
//设置Bolt, 用于过滤error信息
builder.setBolt("filterBolt", new FilterBolt(),8)
.shuffleGrouping("KafkaSpout");//数据是从KafkaSpout中获取
三、第二个Bolt,
为了将过滤后的日志在写道kafka中, 将使用storm提供的KafkaBolt,:
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
四、设置集群, 运行
boolean isLocal = false;
Config config = new Config();
//set producer properties , properties设置kafka的属性
Properties properties = new Properties();
properties.put("metadata.broker.list", "192.168.10.224:9062,192.168.10.225:9062,192.168.10.226:9062");
properties.put("requst.required.acks", "1"); //1 -1 0
properties.put("serializer.class", "kafka.serializer.StringEnscoder");
config.put("kafka.broker.properties", properties);
if (isLocal) {//本地模式执行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("logFilter", config, builder.createTopology());
}else {
config.put(Config.NIMBUS_HOST, "192.168.10.224");
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"master","slave1","slave2"}));
System.setProperty("storm.jar","C:\\Users\\12285\\Desktop\\Storm.jar");
config.setNumWorkers(4);
try {
StormSubmitter.submitTopologyWithProgressBar("log", config,
builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();
}
}
五、启动集群
出现集群无法连接zookeeper集群
5.1、flume+kafka+storm整合02—问题
5.2、启动storm集群:
cd /opt/apache-storm-0.9.4
./bin/storm nimbus >> logs/nimbus.out 2>&1 &
./bin/storm supervisor >> logs/supervisor.out 2>&1 &
./bin/storm ui >> logs/ui.out 2>&1 &
5.3、启动kafka
启动集群, 在master, slave1,slave2上执行
cd /opt/kafka
./bin/kafka-server-start.sh config/server.properties
5.3启动一个监听testflume的consumer
用于监听flume的数据,
cd /opt/kafka
./bin/kafka-console-consumer.sh --zookeeper master:3384,slave1:3384,slave2:3384 --from-beginning -topic testflume
5.4 启动flume
cd /opt/apache-flume-1.6.0-bin
bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf/ --conf-file ./testconf/exec.properties --name a1 -Dflume.root.logger=INFO,console
六、storm测试
未完待续。。。。