##统计5分钟粒度时间的数据指标 ###1.1、每个周期数据按照统计时间点,指标存入TreeMap(treemap是按照自然排序,为了在每个周期提交的时候,将前一个时间点的统计指标存入数据库中),
/**
* 记录一个周期内的数据,包含两个时间点
* TreeMap key: 时间的值,
* Map key: itemKey , 统计的指标的值
* {(timeKey:{(itemKey:[datas])})}
*/
static TreeMap counterMap;
###1.2、对每个接收的数据,判断是否超过周期,存入treemap中,
//获取序列化之后的数据, 一条数据
Map inputMap = (Map) input.getValue(0);
//itemkey
String itemkey = inputMap.get("srcIp");
//判断时候到了一个周期,进行统计数据 , 将数据存到counterMap中, 以供统计
boolean handle = BaseHandle.handle(inputMap, lastCheckTime, cycle,
countCycle, counterMap, itemkey);
###1.2.1、具体看下handle中的处理逻辑 ####1.2.1.1、选择判断的时间, 以数据的开始时间作为判断
//开始时间
String stime = inputMap.get("stime");
####1.2.1.2、处理脏数据:
- 1、如果开始时间在上次提交时间前,不进行处理;
- 2、如果开始时间在下次提交时间后, 这个也不处理,
//判断数据的时间是否距离最后一次统计时间超过周期范围内, 判定为脏数据不进行处理
if(DateUtils.checkDateMinuteForNow(stime, lastCheckTime) > cycle*countCycle
|| DateUtils.checkDateMinuteForNow(stime, lastCheckTime) < 0){
System.err.println(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+","+
stime+","+inputMap.get("srcIp") );
//如果超过两个周期,则直接退出
return false;
}
####1.2.1.3、格式化时间点, 将开始时间格式化成周期范围的形式
String stimeKey = stime.substring(0, 14); //yyyy-MM-dd HH:mm:
//此处用于测试减小周期, 格式化timeKey到分钟
String minStr = stime.substring(14,16);
int min = Integer.parseInt(minStr)/(cycle)*(cycle); //60
stimeKey = stimeKey + (min= cycle*countCycle){
//如果超过统计范围则对当前Map进行统计
return true;
}
###1.3、可以提交了, 进行统计, 存入数据库中。
if(handle){
//获取第一个key
String timekey = counterMap.firstKey();
//统计数据
countData(timekey, "idc_srcip_hour_tab", "srcIp");
//清空key
counterMap.remove(timekey);
//更新最后统计时间
lastCheckTime = System.currentTimeMillis();
}
###1.3.1、统计并入库
/**
* @description 统计指定周期的数据
* @param timekey
* @return
*/
protected static void countData(String timekey, String tableName, String itemName){
Map detailMap = counterMap.get(timekey);
//遍历数据,统计
for (String itemKey : detailMap.keySet()) { //ip的值
//多条详单数据, 每个map是一条详单数据
List datas = detailMap.get(itemKey); //对应srcip的数据详单
//总条数
int reqnum = datas.size();
Record record = new Record();
record.set("ctime", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
.set("stime", timekey)
.set("itemvalue", itemKey)
.set("reqnum", reqnum);
//保存数据
Db.save("idc_srcip_hour_tab", record);
}
}
##问题 ###理想情况时延不大
脏数据处理: lastCheckTime(09:25:33)之前的数据抛弃, 下一个lastCheckTime(10:25:33)的数据也抛弃
lastCheckTime 09:25:33 start ...
first:09
second:10
lastCheckTime 10:25:33 提交09的数据 所以统计时延为一个周期(09数据,只能到10才能统计出来)
first:10
second:11
lastCheckTime 11:25:33
first:11
second:12
##缺陷 如果在09:25:33启动后,第一条数据是08, 第二条是09, 那么只有两个key,08,09, 10:25:33之前10点的数据都被存到09中, 这回导致数据指标不准