您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DWM 层 -- 访客 UV 计算

宝哥大数据 发布时间:2021-03-10 22:41:18 ,浏览量:1

一、需求分析与思路

  UV,全称是 Unique Visitor,即独立访客,对于实时计算中,也可以称为 DAU(Daily Active User),即每日活跃用户,因为实时计算中的 uv 通常是指当日的访客数。   那么如何从用户行为日志中识别出当日的访客,那么有两点: ➢ 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用 ➢ 其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重

1.1、核心的过滤代码

➢ 首先用 keyby 按照 mid 进行分组,每组表示当前设备的访问情况

        //TODO 3.对读取到的数据进行结构的换换
        SingleOutputStreamOperator jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));

        //TODO 4.按照设备id进行分组
        KeyedStream keybyWithMidDS = jsonObjDS.keyBy(
            jsonObj -> jsonObj.getJSONObject("common").getString("mid")
        );

➢ 分组后使用 keystate 状态,记录用户进入时间,实现 RichFilterFunction 完成过滤

       //TODO 5.过滤得到UV
        SingleOutputStreamOperator filteredDS = keybyWithMidDS.filter(
            new RichFilterFunction() {
            。。。
         })

➢ 重写 open 方法用来初始化状态

                //定义状态
                ValueState lastVisitDateState = null;
                //定义日期工具类
                SimpleDateFormat sdf = null;

                @Override
                public void open(Configuration parameters) throws Exception {
                    //初始化日期工具类
                    sdf = new SimpleDateFormat("yyyyMMdd");
                    //初始化状态
                    ValueStateDescriptor lastVisitDateStateDes =
                        new ValueStateDescriptor("lastVisitDateState", String.class);
                    //因为我们统计的是日活DAU,所以状态数据只在当天有效 ,过了一天就可以失效掉
                    StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
                    lastVisitDateStateDes.enableTimeToLive(stateTtlConfig);
                    this.lastVisitDateState = getRuntimeContext().getState(lastVisitDateStateDes);
                }

➢ 重写 filter 方法进行过滤 ◼ 可以直接筛掉 last_page_id 不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。 ◼ 状态用来记录用户的进入时间,只要这个 lastVisitDate 是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。 ◼ 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里 enableTimeToLive 设定了 1 天的过期时间,避免状态过大。

 @Override
                public boolean filter(JSONObject jsonObj) throws Exception {
                    //首先判断当前页面是否从别的页面跳转过来的
                    String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                    if (lastPageId != null && lastPageId.length() > 0) {
                        return false;
                    }

                    //获取当前访问时间
                    Long ts = jsonObj.getLong("ts");
                    //将当前访问时间戳转换为日期字符串
                    String logDate = sdf.format(new Date(ts));
                    //获取状态日期
                    String lastVisitDate = lastVisitDateState.value();

                    //用当前页面的访问时间和状态时间进行对比
                    if (lastVisitDate != null && lastVisitDate.length() > 0 && lastVisitDate.equals(logDate)) {
                        System.out.println("已访问:lastVisitDate-" + lastVisitDate + ",||logDate:" + logDate);
                        return false;
                    } else {
                        System.out.println("未访问:lastVisitDate-" + lastVisitDate + ",||logDate:" + logDate);
                        lastVisitDateState.update(logDate);
                        return true;
                    }
                }
1.2、将过滤处理后的 UV 写入到 Kafka 的 dwm_unique_visit
        //TODO 6. 向kafka中写回,需要将json转换为String
        //6.1 json->string
        SingleOutputStreamOperator kafkaDS = filteredDS.map(jsonObj -> jsonObj.toJSONString());

        //6.2 写回到kafka的dwm层
        kafkaDS.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));
1.3、测试

➢ 启动 logger.sh ➢ 运行 Idea 中的 BaseLogApp ➢ 运行 Idea 中的 UniqueVisitApp ➢ 查看控制台输出以及 kafka 的 dwm_unique_visit 主题 ➢ 执行流程 模拟生成数据->日志处理服务器->写到 kafka 的 ODS 层(ods_base_log)->BaseLogApp 分流->dwd_page_log->UniqueVisitApp 读取并处理->写回到 kafka 的 dwm 层

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0450s