strom topN — RollingCountBolt
1、Spout 数据源很简单, 随机发送一个字符串数组中的字符串。
@Override
public void execute(Tuple tuple) {
//emit
if (TupleHelpers.isTickTuple(tuple)) {//接收到了tick tuple, 可以发送
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
} else {
countObjAndAck(tuple);
}
}
2.1.1 、先看下 tick tuple的条件, 查看TupleHelpers
可以看到在isTickTuple的条件是 tuple的ComponentId等于SystemComponentId, 且tuple的StreamId等于SYSTEM_TICK_STREAM_ID
那么在什么时候id相等, 什么时候tuple的SourceStreamId= “__tick”?
package storm.starter.util;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
public final class TupleHelpers {
private TupleHelpers() {
}
/**
* 根据tuple的
* 产生这个tuple的组件的id(Gets the id of the component that created this tuple.)
* 以及emit这个tuple的stream id (Gets the id of the stream that this tuple was emitted to.)
*来判断是否是tick tuple
* @param tuple
* @return
*/
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
}
怎么确定tick tuple, 在getComponentConfiguration中设置tick tuple的发送频率
@Override
public Map getComponentConfiguration() {
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
}
2.1.2、不是tick tuple ,执行countObjAndAck(tuple); //只是做统计, 并对tuple ack
/**
* 对tuple进行计数
* @param tuple
*/
private void countObjAndAck(Tuple tuple) {
//获取tuple的单词
Object obj = tuple.getValue(0);
counter.incrementCount(obj);//计数, 实际是由SlotBaseCounter计数, 新的数据插入第一个slot, //objCounter.incrementCount(obj, headSlot);
collector.ack(tuple);//ack
}
2.1.2、捕捉到tick tuple 执行emitCurrentWindowCounts()
2.2 滑动窗口计数器: SlidingWindowCounter counter
2.2.1、SlidingWindowCounter中包含四个元素
/**
* 真正的计数器 是SlotBaseCounter,
*/
private SlotBasedCounter objCounter; //见2.3、SlotBasedCounter
/**
* 第一个slot的序号
*/
private int headSlot;
/**
* 最后一个slot的序号
*/
private int tailSlot;
/**
* windowLength 按照slot计数 有多少个slot
*/
private int windowLengthInSlots;
2.2.2、在构造函数中初始化了headSlot, 和tailSlot 的关系
this.headSlot = 0;
this.tailSlot = slotAfter(headSlot);
//head
/**
* 后一个slot, 是slot+1 取 numSlots的模
* @param slot 当前slot的位置
* @return 后面一个slot的位置
*/
private int slotAfter(int slot) {
// 1%3
return (slot + 1) % windowLengthInSlots;
}
2.3、SlotBasedCounter是真正进行计数业务
2.3.1、它包含两个元素, objToCounts 是一个Map
/**
* 基于slot的count
* T tuple的内容 word
* long[] 在所在的slot中的计数
*/
private final Map objToCounts = new HashMap();
/**
* slot的数量
*/
private final int numSlots;
2.3.2、增加一个单词
/**
* 对每个slot的单词进行计数
* @param obj 被计数的对象
* @param slot obj所在的solt的计数
*/
public void incrementCount(T obj, int slot) {
long[] counts = objToCounts.get(obj);//在counter中指定slot中obj的计数
if (counts == null) {
counts = new long[this.numSlots];
objToCounts.put(obj, counts);
}
counts[slot]++;//计数加一
}