您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

storm--topN

宝哥大数据 发布时间:2018-03-09 10:05:18 ,浏览量:1

strom topN — RollingCountBolt 1、Spout 数据源很简单, 随机发送一个字符串数组中的字符串。

这里写图片描述

2、第一个Bolt— RollingCountBolt, 2.1、在execute中,只有等到TupleHelpers.isTickTuple(tuple),才可以发送
    @Override
    public void execute(Tuple tuple) {
        //emit
        if (TupleHelpers.isTickTuple(tuple)) {//接收到了tick tuple, 可以发送
            LOG.debug("Received tick tuple, triggering emit of current window counts");
            emitCurrentWindowCounts();
        } else {
            countObjAndAck(tuple);
        }
    }
2.1.1 、先看下 tick tuple的条件, 查看TupleHelpers 可以看到在isTickTuple的条件是 tuple的ComponentId等于SystemComponentId, 且tuple的StreamId等于SYSTEM_TICK_STREAM_ID 那么在什么时候id相等, 什么时候tuple的SourceStreamId= “__tick”?
package storm.starter.util;

import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;

public final class TupleHelpers {

    private TupleHelpers() {
    }
    /**
     * 根据tuple的
     *  产生这个tuple的组件的id(Gets the id of the component that created this tuple.)
     *    以及emit这个tuple的stream id (Gets the id of the stream that this tuple was emitted to.)
     *来判断是否是tick tuple    
     * @param tuple
     * @return
     */
    public static boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && 
                tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
    }
}
怎么确定tick tuple, 在getComponentConfiguration中设置tick tuple的发送频率
    @Override
    public Map getComponentConfiguration() {
        Map conf = new HashMap();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
        return conf;
    }
2.1.2、不是tick tuple ,执行countObjAndAck(tuple); //只是做统计, 并对tuple ack
    /**
     * 对tuple进行计数
     * @param tuple
     */
    private void countObjAndAck(Tuple tuple) {
        //获取tuple的单词
        Object obj = tuple.getValue(0);
        counter.incrementCount(obj);//计数, 实际是由SlotBaseCounter计数, 新的数据插入第一个slot, //objCounter.incrementCount(obj, headSlot);
        collector.ack(tuple);//ack
    }

2.1.2、捕捉到tick tuple 执行emitCurrentWindowCounts() 2.2 滑动窗口计数器: SlidingWindowCounter counter 2.2.1、SlidingWindowCounter中包含四个元素
  /** 
   * 真正的计数器 是SlotBaseCounter, 
   */
  private SlotBasedCounter objCounter;    //见2.3、SlotBasedCounter
  /** 
   * 第一个slot的序号
   */
  private int headSlot;
  /** 
   * 最后一个slot的序号
   */
  private int tailSlot;
  /**  
   * windowLength 按照slot计数 有多少个slot 
   */
  private int windowLengthInSlots;
2.2.2、在构造函数中初始化了headSlot, 和tailSlot 的关系
    this.headSlot = 0;
    this.tailSlot = slotAfter(headSlot);

    //head 
  /**
   * 后一个slot, 是slot+1 取 numSlots的模
   * @param slot  当前slot的位置
   * @return      后面一个slot的位置
   */
  private int slotAfter(int slot) {
    //  1%3  
    return (slot + 1) % windowLengthInSlots;
  }
2.3、SlotBasedCounter是真正进行计数业务 2.3.1、它包含两个元素, objToCounts 是一个Map
  /**
   * 基于slot的count 
   * T tuple的内容  word
   * long[] 在所在的slot中的计数
   */
  private final Map objToCounts = new HashMap();
  /**
   * slot的数量
   */
  private final int numSlots;
2.3.2、增加一个单词
  /**
   * 对每个slot的单词进行计数
   * @param obj     被计数的对象
   * @param slot    obj所在的solt的计数
   */
  public void incrementCount(T obj, int slot) {
    long[] counts = objToCounts.get(obj);//在counter中指定slot中obj的计数
    if (counts == null) {
      counts = new long[this.numSlots];
      objToCounts.put(obj, counts);
    }
    counts[slot]++;//计数加一
  }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0406s