您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Strom学习01--例子WordCountTopology

宝哥大数据 发布时间:2017-03-12 18:54:12 ,浏览量:0

WordCountTopology Spout

spout为数据的源头,通过TopologyBuilder创建一个Spout ,用于模拟数据的源头,

    builder.setSpout("spout", new RandomSentenceSpout(), 5);

我们看看Spout的业务处理流程:

RandomSentenceSpout

具体关于ISpout的几个方法 提供了一个open()对Spout进行初始化:

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }
在nextTuple() 中不断的向外输出数据:

当nextTuple被调用, Strom被请求, Spout会发射Tuple到output collecter, 如果没有Tuple可发射, 该方法就会返回, nextTuple, ack , fail 三个方法在Spout的任务中, 必须是在一个线程中,一个紧密的循环。 如果没有tuple可发射该方法会休眠短暂的时间。

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    //模拟的数据源
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //随机的数据
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //通过
    _collector.emit(new Values(sentence));
  }
在declareOutputFields 设置了流的filed:

Storm学习00–IComponent , 了解OutputFieldsDeclareder

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
二、Bolt 2.1Bolt中的几个方法

2.1.1.prepare()   该方法和Spout中的open方法类似, 为bolt提供OutputCollector,用于从Bolt中发送Tuple, 在execute方法执行之:

    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

2.1.2declareOutputFields方法 用于声明当前Bolt发送的tuple中包含的字段

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }
2.1.3execute方法

  这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是数。

  • (1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

  • (2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性


    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }
2.1.4、getComponentConfiguration方法

此方法用于声明针对当前组件的特殊的Configuration配置

    @Override
    public Map getComponentConfiguration() {
      return null;
    }
2.2、 拆分句子成单词SplitSentence

对Spout发送来的tuple(为一条条句子),进行处理, 拆分成单词, 处理通过python脚本处理的,

  public static class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() {
      super("python", "splitsentence.py");
    }
    //声明当前Bolt的tuple发送流, Stream流的定义通过declarer.declare(new Fields("word"));
完成的, 启动参数是发送的域fileds.

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }
    //用于声明针对当前组件的特殊的Configuration配置, 没有返回null
    @Override
    public Map getComponentConfiguration() {
      return null;
    }
  }

splitsentence.py 是如何产生数据,通过对spout送来的数据按照空格拆分, 然后调用storm.emit发送出去。

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        //一个句子为一个tuple获取tuple的第一个元素,进行拆分
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])//发送数据。

SplitSentenceBolt().run()
2.3、统计单词的数量WordCount

  public static class WordCount extends BaseBasicBolt {
    //用于统计每个单词的个数
    Map counts = new HashMap();
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      //获取tuple中的数据
      String word = tuple.getString(0);
      Integer count = counts.get(word);
     //最开始map中没有该单词, 设置count为0
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
      //发送数据
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      //设置流的fileds以供下一个流程使用该bolt的数据
      declarer.declare(new Fields("word", "count"));

    }
  }
三、主函数
  public static void main(String[] args) throws Exception {

    //创建拓扑图对象
    TopologyBuilder builder = new TopologyBuilde();
    //设置Spout, 5是该spout执行5个线程
    builder.setSpout("spout", new RandomSentenceSpout(), 5);

    //设置拆分句子的Bolt, 
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
   //设置统计单词次数的Bolt
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);


    //集群模式
    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
//本地模式
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
    }
  }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0392s