您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

storm--通过sleep定时统计topN

宝哥大数据 发布时间:2018-06-12 08:59:45 ,浏览量:0

###1.1、模拟发送源

1.1.1、nextTuple()

        String[] words = new String[] {"hello", "world", "hadoop", "hive", "spark", "hbase"};
        //随机发送单词
        String word = words[new Random().nextInt(words.length)];
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //发送
        collector.emit(new Values(word));
1.1.2、指定输出字段
        /**
        *指定输出字段
        */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
1.2、中间一个小的合并, 每秒emit一次 1.2.1、在prepare()中定义一个线程, 通过sleep(1000), 周期性修改发射标识emitFlag
//发射标识
private volatile  boolean emitFlag = false;
@Override
public void prepare(Map stormConf, TopologyContext context,
        final OutputCollector collector) {
    new Thread(
            new Runnable() {
                @Override
                public void run() {
                    while(true){
                        if(emitFlag) {
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            System.out.println("WordCouter:" + sdf.format(new Date()));
                            for (String key : counters.keySet()) {
                                collector.emit(new Values(key, counters.get(key)));
                            }
                            //发送完成
                            emitFlag=false;
                            //中间合并bolt, 每次发射完成后, 将couters清空
                            counters.clear();
                        }
                        //通过sleep,周期性修改emitFlag
                        try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
                        emitFlag=true;
                    }
                }
            }
            ).start();

}
1.2.2、execute中,只对数据源发送来的数据进行统计
    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer num = counters.get(word);
        if(num == null) {
            num = 0;
        } 
        num++;
        counters.put(word, num);
    }
1.2.3、设置输出字段
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
1.3、最后的合并 和中间的小合并基本一致,只是在发射的时候,要进行排序,找到top5.由于统计的结果使用Map存储, 无法排序, 所以将存储结果放到List集合中, 通过对Value进行排序,获取top5. 1.3.1、自定义排序
    private static class ValueComparator implements Comparator {
        @Override
        public int compare(Entry entry1, Entry entry2) {
            return entry2.getValue() - entry1.getValue();
        }
    }
1.3.2、存储结果放到List集合中
List list = new ArrayList();
list.addAll(counters.entrySet());
1.3.3、对集合中元素,按照值排序
Collections.sort(list, new ValueComparator());
都是在prepare()中进行
    private  boolean emit = false;
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            final OutputCollector collector) {
        new Thread( new Runnable() {
            @Override
            public void run() {
                while(true){
                    if(emit) {
                        List list = new ArrayList();
                        list.addAll(counters.entrySet());
                        Collections.sort(list, new ValueComparator());

                        //发送
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.print("TotalWordCouter:" + sdf.format(new Date())+"发送topN: {");
                        for (int i = 0; i             
关注
打赏
1587549273
查看更多评论
0.0432s