您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DWM 层-跳出明细计算

宝哥大数据 发布时间:2021-03-11 08:01:01 ,浏览量:1

1.1、什么是跳出

  跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。   关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果。

1.2、 计算跳出行为的思路

  首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:

➢ 该页面是用户近期访问的第一个页面   这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。

➢ 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。   这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?

  最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识别某个事件。

用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。

二、代码实现 2.1、通过 Flink 的 CEP 完成跳出判断
  1. 确认添加了 CEP 的依赖包
  2. 设定时间语义为事件时间并指定数据中的 ts 字段为事件时间   由于这里涉及到时间的判断,所以必须设定数据流的 EventTime 和水位线。这里没有设置延迟时间,实际生产情况可以视乱序情况增加一些延迟。   增加延迟把 forMonotonousTimestamps 换为 forBoundedOutOfOrderness 即可。 注意:flink1.12 默认的时间语义就是事件时间,所以不需要执行env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //TODO 4. 指定事件时间字段
        SingleOutputStreamOperator jsonObjWithTSDS = jsonObjDS.assignTimestampsAndWatermarks(
           //  注意此处 WatermarkStrategy.
            WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
                new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
                        return jsonObj.getLong("ts");
                    }
                }
            ));

  1. 根据日志数据的 mid 进行分组 因为用户的行为都是要基于相同的 Mid 的行为进行判断,所以要根据 Mid 进行分组。
        //TODO 5.按照mid进行分组
        KeyedStream keyByMidDS = jsonObjWithTSDS.keyBy(
            jsonObj -> jsonObj.getJSONObject("common").getString("mid")
        );

  1. 配置 CEP 表达式
        /*
            计算页面跳出明细,需要满足两个条件
                1.不是从其它页面跳转过来的页面,是一个首次访问页面
                        last_page_id == null
                2.距离首次访问结束后10秒内,没有对其它的页面再进行访问
        */
        //TODO 6.配置CEP表达式
        Pattern pattern = Pattern.begin("first")
                .where(
                        //模式1:不是从其它页面跳转过来的页面,是一个首次访问页面
                        new SimpleCondition() {
                            @Override
                            public boolean filter(JSONObject jsonObj) throws Exception {
                                //获取last_page_id
                                String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                                //判断是否为null 将为空的保留,非空的过滤掉
                                if (lastPageId == null || lastPageId.length() == 0) {
                                    return true;
                                }
                                return false;
                            }
                        }
                )
                .next("next")
                .where(
                        //模式2. 判读是否对页面做了访问
                        new SimpleCondition() {
                            @Override
                            public boolean filter(JSONObject jsonObj) throws Exception {
                                //获取当前页面的id, 注意此处是page_id
                                String pageId = jsonObj.getJSONObject("page").getString("page_id");
                                //判断当前访问的页面id是否为null
                                if (pageId != null && pageId.length() > 0) {
                                    return true;
                                }
                                return false;
                            }
                        }
                )
                //3.时间限制模式
                .within(Time.milliseconds(10000));

在这里插入图片描述

  1. 根据表达式筛选流
        //TODO 7.根据:CEP表达式筛选流
        PatternStream patternStream = CEP.pattern(keyByMidDS, pattern);
  1. 提取命中的数据, 将超时的数据放到侧输出流

➢ 设定超时时间标识 timeoutTag ➢ flatSelect 方法中,实现 PatternFlatTimeoutFunction 中的 timeout 方法。 ➢ 所有 out.collect 的数据都被打上了超时标记 ➢ 本身的 flatSelect 方法因为不需要未超时的数据所以不接受数据。 ➢ 通过 SideOutput 侧输出流输出超时数据

        //TODO 8.从筛选之后的流中,提取数据   将超时数据  放到侧输出流中
        OutputTag timeoutTag = new OutputTag("timeout") {};

        SingleOutputStreamOperator filterDS = patternStream.flatSelect(
                timeoutTag,
                //处理超时数据
                new PatternFlatTimeoutFunction() {
                    @Override
                    public void timeout(Map pattern, long timeoutTimestamp, Collector out) throws Exception {
                        //获取所有符合first的json对象
                        List jsonObjectList = pattern.get("first");
                        //注意:在timeout方法中的数据都会被参数1中的标签标记
                        for (JSONObject jsonObject : jsonObjectList) {
                            out.collect(jsonObject.toJSONString());
                        }
                    }
                },
                //处理的没有超时数据
                new PatternFlatSelectFunction() {
                    @Override
                    public void flatSelect(Map pattern, Collector out) throws Exception {
                        //没有超时的数据,不在我们的统计范围之内 ,所以这里不需要写什么代码
                    }
                }
        );
        
        //TODO 9.从侧输出流中获取超时数据
        DataStream jumpDS = filterDS.getSideOutput(timeoutTag);

        //jumpDS.print(">>>>>");

  1. 将跳出数据写回到 kafka 的 DWM 层
        //TODO 10.将跳出数据写回到kafka的DWM层
        jumpDS.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

2.2、4 测试
  1. 利用测试数据验证
DataStream dataStream = env
            .fromElements(
                "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"home\"},\"ts\":150000} ",
                "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                    "\"detail\"},\"ts\":300000} "
            );
  1. 查看控制台以及 dwm_user_jump_detail 输出效果 注意:为了看效果,设置并行度为 1 在这里插入图片描述
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0564s