您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

flume+kafka+storm整合01

宝哥大数据 发布时间:2017-03-20 18:36:15 ,浏览量:1

具体实现 1、实现Spout, 因为Storm源数据是从kafka中获取, 所以使用storm提供的KafkaSpout
        //由于Spout是从kafka中获取数据, Storm提供了KafkaSpout 
        //配置kafkaSpout 
        //kafka的topic, 是为了方便从哪儿拿数据
        String topic = "testFlume";
        ZkHosts zkHosts = new ZkHosts("192.168.10.224:3384,192.168.10.225:3384,192.168.10.226:3384");
        //
        SpoutConfig spoutConf = new SpoutConfig(zkHosts, 
                                topic,    //kafka topic
                                "/test",  //偏移量的根目录 
                                "test");  //唯一id
        //通过SpoutConfig设置其他参数
        spoutConf.forceFromStart = false; //从头开始消费
        spoutConf.socketTimeoutMs = 60*1000;//连接超时
        //设置scheme,  控制从kafak中获取的数据输出为String
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        //KafkaSpout中只接收一个参数
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
为topology提供Spout
        //创建TopologyBilder
        TopologyBuilder builder = new TopologyBuilder();
        //为topology设置Spout
        builder.setSpout("KafkaSpout", kafkaSpout, 3);

2、设置第一个Bolt

为了过滤Spout中发送过来的日志中的ERROR信息 创建一个Bolt提供此功能:

package com.chb.flume_kafka_storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
 * 第一个Bolt为了过滤Spout发送过来的数据中的ERROR信息 
 */
public class FilterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple arg0, BasicOutputCollector arg1) {
        String line = arg0.getString(0);
        if (line.contains("ERROR")) {
            System.err.println(line);
            arg1.emit(new Values(line));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // 这个地方写error是给后面FieldNameBasedTupleToKafkaMapper来用
        arg0.declare(new Fields("error"));
    }
}
为topology设置Bolt
        //设置Bolt, 用于过滤error信息
        builder.setBolt("filterBolt", new FilterBolt(),8)
        .shuffleGrouping("KafkaSpout");//数据是从KafkaSpout中获取
三、第二个Bolt,

为了将过滤后的日志在写道kafka中, 将使用storm提供的KafkaBolt,:

KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("test"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

四、设置集群, 运行

        boolean isLocal = false;
        Config config = new Config();
        //set producer properties , properties设置kafka的属性
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "192.168.10.224:9062,192.168.10.225:9062,192.168.10.226:9062");
        properties.put("requst.required.acks", "1");  //1 -1 0
        properties.put("serializer.class", "kafka.serializer.StringEnscoder");
        config.put("kafka.broker.properties", properties);
        if (isLocal) {//本地模式执行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("logFilter", config, builder.createTopology());
        }else {
            config.put(Config.NIMBUS_HOST, "192.168.10.224");
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"master","slave1","slave2"}));
            System.setProperty("storm.jar","C:\\Users\\12285\\Desktop\\Storm.jar");

            config.setNumWorkers(4);
            try {
                StormSubmitter.submitTopologyWithProgressBar("log", config,
                builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException e) {
                e.printStackTrace();
            }
        }
五、启动集群 出现集群无法连接zookeeper集群   5.1、flume+kafka+storm整合02—问题 5.2、启动storm集群:
cd /opt/apache-storm-0.9.4
./bin/storm nimbus >> logs/nimbus.out  2>&1 & 
./bin/storm supervisor >> logs/supervisor.out 2>&1 &
./bin/storm  ui >> logs/ui.out 2>&1 &     
5.3、启动kafka

启动集群, 在master, slave1,slave2上执行

cd /opt/kafka
./bin/kafka-server-start.sh config/server.properties 
5.3启动一个监听testflume的consumer

用于监听flume的数据,

cd /opt/kafka
./bin/kafka-console-consumer.sh --zookeeper master:3384,slave1:3384,slave2:3384  --from-beginning -topic testflume
5.4 启动flume
cd /opt/apache-flume-1.6.0-bin
bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf/ --conf-file  ./testconf/exec.properties    --name a1 -Dflume.root.logger=INFO,console
六、storm测试 未完待续。。。。
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0437s