###1.1、模拟发送源
1.1.1、nextTuple()
String[] words = new String[] {"hello", "world", "hadoop", "hive", "spark", "hbase"};
//随机发送单词
String word = words[new Random().nextInt(words.length)];
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//发送
collector.emit(new Values(word));
1.1.2、指定输出字段
/**
*指定输出字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
1.2、中间一个小的合并, 每秒emit一次
1.2.1、在prepare()中定义一个线程, 通过sleep(1000), 周期性修改发射标识emitFlag
//发射标识
private volatile boolean emitFlag = false;
@Override
public void prepare(Map stormConf, TopologyContext context,
final OutputCollector collector) {
new Thread(
new Runnable() {
@Override
public void run() {
while(true){
if(emitFlag) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("WordCouter:" + sdf.format(new Date()));
for (String key : counters.keySet()) {
collector.emit(new Values(key, counters.get(key)));
}
//发送完成
emitFlag=false;
//中间合并bolt, 每次发射完成后, 将couters清空
counters.clear();
}
//通过sleep,周期性修改emitFlag
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
emitFlag=true;
}
}
}
).start();
}
1.2.2、execute中,只对数据源发送来的数据进行统计
@Override
public void execute(Tuple input) {
String word = input.getString(0);
Integer num = counters.get(word);
if(num == null) {
num = 0;
}
num++;
counters.put(word, num);
}
1.2.3、设置输出字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
1.3、最后的合并
和中间的小合并基本一致,只是在发射的时候,要进行排序,找到top5.由于统计的结果使用Map存储, 无法排序, 所以将存储结果放到List集合中, 通过对Value进行排序,获取top5.
1.3.1、自定义排序
private static class ValueComparator implements Comparator {
@Override
public int compare(Entry entry1, Entry entry2) {
return entry2.getValue() - entry1.getValue();
}
}
1.3.2、存储结果放到List集合中
List list = new ArrayList();
list.addAll(counters.entrySet());
1.3.3、对集合中元素,按照值排序
Collections.sort(list, new ValueComparator());
都是在prepare()中进行
private boolean emit = false;
@Override
public void prepare(Map stormConf, TopologyContext context,
final OutputCollector collector) {
new Thread( new Runnable() {
@Override
public void run() {
while(true){
if(emit) {
List list = new ArrayList();
list.addAll(counters.entrySet());
Collections.sort(list, new ValueComparator());
//发送
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.print("TotalWordCouter:" + sdf.format(new Date())+"发送topN: {");
for (int i = 0; i
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?