UV,全称是 Unique Visitor,即独立访客,对于实时计算中,也可以称为 DAU(Daily Active User),即每日活跃用户,因为实时计算中的 uv 通常是指当日的访客数。 那么如何从用户行为日志中识别出当日的访客,那么有两点: ➢ 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用 ➢ 其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重
1.1、核心的过滤代码➢ 首先用 keyby 按照 mid 进行分组,每组表示当前设备的访问情况
//TODO 3.对读取到的数据进行结构的换换
SingleOutputStreamOperator jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
//TODO 4.按照设备id进行分组
KeyedStream keybyWithMidDS = jsonObjDS.keyBy(
jsonObj -> jsonObj.getJSONObject("common").getString("mid")
);
➢ 分组后使用 keystate 状态,记录用户进入时间,实现 RichFilterFunction 完成过滤
//TODO 5.过滤得到UV
SingleOutputStreamOperator filteredDS = keybyWithMidDS.filter(
new RichFilterFunction() {
。。。
})
➢ 重写 open 方法用来初始化状态
//定义状态
ValueState lastVisitDateState = null;
//定义日期工具类
SimpleDateFormat sdf = null;
@Override
public void open(Configuration parameters) throws Exception {
//初始化日期工具类
sdf = new SimpleDateFormat("yyyyMMdd");
//初始化状态
ValueStateDescriptor lastVisitDateStateDes =
new ValueStateDescriptor("lastVisitDateState", String.class);
//因为我们统计的是日活DAU,所以状态数据只在当天有效 ,过了一天就可以失效掉
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
lastVisitDateStateDes.enableTimeToLive(stateTtlConfig);
this.lastVisitDateState = getRuntimeContext().getState(lastVisitDateStateDes);
}
➢ 重写 filter 方法进行过滤 ◼ 可以直接筛掉 last_page_id 不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。 ◼ 状态用来记录用户的进入时间,只要这个 lastVisitDate 是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。 ◼ 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里 enableTimeToLive
设定了 1 天的过期时间,避免状态过大。
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
//首先判断当前页面是否从别的页面跳转过来的
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
if (lastPageId != null && lastPageId.length() > 0) {
return false;
}
//获取当前访问时间
Long ts = jsonObj.getLong("ts");
//将当前访问时间戳转换为日期字符串
String logDate = sdf.format(new Date(ts));
//获取状态日期
String lastVisitDate = lastVisitDateState.value();
//用当前页面的访问时间和状态时间进行对比
if (lastVisitDate != null && lastVisitDate.length() > 0 && lastVisitDate.equals(logDate)) {
System.out.println("已访问:lastVisitDate-" + lastVisitDate + ",||logDate:" + logDate);
return false;
} else {
System.out.println("未访问:lastVisitDate-" + lastVisitDate + ",||logDate:" + logDate);
lastVisitDateState.update(logDate);
return true;
}
}
1.2、将过滤处理后的 UV 写入到 Kafka 的 dwm_unique_visit
//TODO 6. 向kafka中写回,需要将json转换为String
//6.1 json->string
SingleOutputStreamOperator kafkaDS = filteredDS.map(jsonObj -> jsonObj.toJSONString());
//6.2 写回到kafka的dwm层
kafkaDS.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
1.3、测试
➢ 启动 logger.sh ➢ 运行 Idea 中的 BaseLogApp ➢ 运行 Idea 中的 UniqueVisitApp ➢ 查看控制台输出以及 kafka 的 dwm_unique_visit 主题 ➢ 执行流程 模拟生成数据->日志处理服务器->写到 kafka 的 ODS 层(ods_base_log)->BaseLogApp 分流->dwd_page_log->UniqueVisitApp 读取并处理->写回到 kafka 的 dwm 层