跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。 关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果。
1.2、 计算跳出行为的思路首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:
➢ 该页面是用户近期访问的第一个页面 这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。
➢ 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。 这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?
最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识别某个事件。
用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。
二、代码实现 2.1、通过 Flink 的 CEP 完成跳出判断- 确认添加了 CEP 的依赖包
- 设定时间语义为事件时间并指定数据中的 ts 字段为事件时间 由于这里涉及到时间的判断,所以必须设定数据流的 EventTime 和水位线。这里没有设置延迟时间,实际生产情况可以视乱序情况增加一些延迟。 增加延迟把 forMonotonousTimestamps 换为 forBoundedOutOfOrderness 即可。 注意:flink1.12 默认的时间语义就是事件时间,所以不需要执行
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//TODO 4. 指定事件时间字段
SingleOutputStreamOperator jsonObjWithTSDS = jsonObjDS.assignTimestampsAndWatermarks(
// 注意此处 WatermarkStrategy.
WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
return jsonObj.getLong("ts");
}
}
));
- 根据日志数据的 mid 进行分组 因为用户的行为都是要基于相同的 Mid 的行为进行判断,所以要根据 Mid 进行分组。
//TODO 5.按照mid进行分组
KeyedStream keyByMidDS = jsonObjWithTSDS.keyBy(
jsonObj -> jsonObj.getJSONObject("common").getString("mid")
);
- 配置 CEP 表达式
/*
计算页面跳出明细,需要满足两个条件
1.不是从其它页面跳转过来的页面,是一个首次访问页面
last_page_id == null
2.距离首次访问结束后10秒内,没有对其它的页面再进行访问
*/
//TODO 6.配置CEP表达式
Pattern pattern = Pattern.begin("first")
.where(
//模式1:不是从其它页面跳转过来的页面,是一个首次访问页面
new SimpleCondition() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
//获取last_page_id
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
//判断是否为null 将为空的保留,非空的过滤掉
if (lastPageId == null || lastPageId.length() == 0) {
return true;
}
return false;
}
}
)
.next("next")
.where(
//模式2. 判读是否对页面做了访问
new SimpleCondition() {
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
//获取当前页面的id, 注意此处是page_id
String pageId = jsonObj.getJSONObject("page").getString("page_id");
//判断当前访问的页面id是否为null
if (pageId != null && pageId.length() > 0) {
return true;
}
return false;
}
}
)
//3.时间限制模式
.within(Time.milliseconds(10000));
- 根据表达式筛选流
//TODO 7.根据:CEP表达式筛选流
PatternStream patternStream = CEP.pattern(keyByMidDS, pattern);
- 提取命中的数据, 将超时的数据放到侧输出流
➢ 设定超时时间标识 timeoutTag ➢ flatSelect 方法中,实现 PatternFlatTimeoutFunction 中的 timeout 方法。 ➢ 所有 out.collect 的数据都被打上了超时标记 ➢ 本身的 flatSelect 方法因为不需要未超时的数据所以不接受数据。 ➢ 通过 SideOutput 侧输出流输出超时数据
//TODO 8.从筛选之后的流中,提取数据 将超时数据 放到侧输出流中
OutputTag timeoutTag = new OutputTag("timeout") {};
SingleOutputStreamOperator filterDS = patternStream.flatSelect(
timeoutTag,
//处理超时数据
new PatternFlatTimeoutFunction() {
@Override
public void timeout(Map pattern, long timeoutTimestamp, Collector out) throws Exception {
//获取所有符合first的json对象
List jsonObjectList = pattern.get("first");
//注意:在timeout方法中的数据都会被参数1中的标签标记
for (JSONObject jsonObject : jsonObjectList) {
out.collect(jsonObject.toJSONString());
}
}
},
//处理的没有超时数据
new PatternFlatSelectFunction() {
@Override
public void flatSelect(Map pattern, Collector out) throws Exception {
//没有超时的数据,不在我们的统计范围之内 ,所以这里不需要写什么代码
}
}
);
//TODO 9.从侧输出流中获取超时数据
DataStream jumpDS = filterDS.getSideOutput(timeoutTag);
//jumpDS.print(">>>>>");
- 将跳出数据写回到 kafka 的 DWM 层
//TODO 10.将跳出数据写回到kafka的DWM层
jumpDS.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
2.2、4 测试
- 利用测试数据验证
DataStream dataStream = env
.fromElements(
"{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"home\"},\"ts\":150000} ",
"{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
"\"detail\"},\"ts\":300000} "
);
- 查看控制台以及 dwm_user_jump_detail 输出效果 注意:为了看效果,设置并行度为 1