您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

flume+kafka+storm整合01bak

宝哥大数据 发布时间:2018-03-07 23:21:19 ,浏览量:1

flume+kafka+storm整合00 中使用KafkaSpout提供数据 接下来设置Bolt 对数据进行处理 1.1、第一个Bolt 原始数据进行处理, 剔除字段不足的数据
declarer.declare(new Fields("srcIp", "destIp"));
1.2、第二个Bolt即MiddleSumBolt统计每10秒内的ip计数, 这个是为了最后进行合计Bolt减小压力, 总计先进行一次小的统计
    @Override
    public void execute(Tuple input) {
        String srcIp = input.getStringByField("srcIp");
        String destIp = input.getStringByField("destIp");

        collector.ack(input);
        Long curTime = System.currentTimeMillis();
        if(curTime-lastTime >= 1000*10) {
            //emit
            System.out.println("MiddleSumBolt emit ..." + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            for (String f : countMap.keySet()) {
                Map map = countMap.get(f);
                for (String ip : map.keySet()) {
                    collector.emit(new Values(f, ip, map.get(ip)));
                }
            }
            countMap.clear();
            lastTime = curTime;
        }else {
            Map srcIpMap = countMap.get("srcIp");
            if(srcIpMap == null) {
                srcIpMap = new HashMap();
            }
            //统计srcip的量
            Integer srcIpNum =  srcIpMap.get(srcIp);
            if(srcIpNum == null) {
                srcIpNum = 0;
            }
            srcIpNum++;
            srcIpMap.put(srcIp, srcIpNum);
            countMap.put("srcIp", srcIpMap);

            //统计destIp数量
            Map destIpMap = countMap.get("destIp");
            if(destIpMap == null) {
                destIpMap = new HashMap();
            }
            Integer destIpNum = destIpMap.get(destIp);
            if(destIpNum == null) {
                destIpNum = 0;
            }
            destIpNum++;
            destIpMap.put(destIp, destIpNum);
            countMap.put("destIp", destIpMap);
        }
    }
MiddleBolt发射出去的字段含义
src-dest 表示是源ip, 还是目的ip,   
declarer.declare(new Fields("src-dest", "ip", "num"));
1.3、最后一个Bolt ,做最后的合计按照每60秒 ,进行一次合计
package com.chb.test.bolt.idc;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;




import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * summarybolt
 * @author 12285
 *
 */
public class SummaryBolt extends BaseRichBolt{
    OutputCollector collector;
    Map countMap = null;
    /**
     * 上次emit的时间
     */
    private Long lastTime = System.currentTimeMillis();
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        countMap = new HashMap();
    }

    @Override
    public void execute(Tuple input) {
        String src_dest = input.getString(0);
        String ip = input.getString(1);
        int num = input.getInteger(2);
        Long curTime = System.currentTimeMillis();
        if(curTime-lastTime >= 1000*60) {
            //emit
            for(String sd: countMap.keySet()){
                Map maps = countMap.get(sd);
                List list = new ArrayList();
                list.addAll(maps.entrySet());
                Collections.sort(list, new ValueComparator());
                System.out.println(sd+"-top5:"+"{");
                //可能没有第5个元素
                for(int i=0; i< list.size() && i < 5; i++) {
                    System.out.println("("+list.get(i).getKey()+","+list.get(i).getValue()+")");
                }
                System.out.println("}" +  new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
            countMap.clear();
            lastTime = curTime;
        }else {
            Map srcIpMap = countMap.get(src_dest);
            if(srcIpMap == null) {
                srcIpMap = new HashMap();
            }

            //统计srcip的量
            Integer srcIpNum =  srcIpMap.get(ip);
            if(srcIpNum == null) {
                srcIpNum = 0;
            }
            srcIpNum = srcIpNum + num;
            srcIpMap.put(ip, srcIpNum);
            countMap.put(src_dest, srcIpMap);
        }
    }
}
输出结果:

这里写图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0400s