您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

top10热门品类之需求以及实现思路分析

宝哥大数据 发布时间:2018-07-12 16:59:38 ,浏览量:1

一、 需求:top10热门品类

计算出来通过筛选条件的那些session,他们访问过的所有品类(点击、下单、支付),按照各个品类的点击、下单和支付次数,降序排序,获取前10个品类,也就是筛选条件下的那一批session的top10热门品类;

点击、下单和支付次数:优先按照点击次数排序、如果点击次数相等,那么按照下单次数排序、如果下单次数相当,那么按照支付次数排序

这个需求是很有意义的,因为这样,就可以让数据分析师、产品经理、公司高层,随时随地都可以看到自己感兴趣的那一批用户,最喜欢的10个品类,从而对自己公司和产品的定位有清晰的了解,并且可以更加深入的了解自己的用户,更好的调整公司战略

二次排序:

如果我们就只是根据某一个字段进行排序,比如点击次数降序排序,那么就不是二次排序; 二次排序,顾名思义,就是说,不只是根据一个字段进行一次排序,可能是要根据多个字段,进行多次排序的 点击、下单和支付次数,依次进行排序,就是二次排序

sortByKey算子,默认情况下,它支持根据int、long等类型来进行排序,但是那样的话,key就只能放一个字段了 所以需要自定义key,作为sortByKey算子的key,自定义key中,封装n个字段,并在key中,自己在指定接口方法中,实现自己的根据多字段的排序算法 然后再使用sortByKey算子进行排序,那么就可以按照我们自己的key,使用多个字段进行排序

本模块中,最最重要和核心的一个Spark技术点

实现思路分析:

1、拿到通过筛选条件的那批session,访问过的所有品类 2、计算出session访问过的所有品类的点击、下单和支付次数,这里可能要跟第一步计算出来的品类进行join 3、自己开发二次排序的key 4、做映射,将品类的点击、下单和支付次数,封装到二次排序key中,作为PairRDD的key 5、使用sortByKey(false),按照自定义key,进行降序二次排序 6、使用take(10)获取,排序后的前10个品类,就是top10热门品类 7、将top10热门品类,以及每个品类的点击、下单和支付次数,写入MySQL数据库 8、使用Scala来开发二次排序key

二、具体实现 2.1、拿到通过筛选条件的那批session,访问过的所有品类(包含点击, 下订单, 支付的品类)

            //获取session访问过的所有品类id
            //包括 点击, 下单, 支付的品类
            //输入
            JavaPairRDD catagoryRDD = filterSession2ActionRDD.flatMapToPair(new PairFlatMapFunction() {

                @Override
                public Iterable call(Tuple2 t)
                        throws Exception {
                    Row row = t._2;


                    List list = new ArrayList();

                    Long clickCategoryId = (Long)row.get(6);
                    if(clickCategoryId != null) {
                        list.add(new Tuple2(clickCategoryId, clickCategoryId));   
                    }

                    String orderCategoryIds = row.getString(8);
                    if(orderCategoryIds != null) {
                        String[] orderCategoryIdsSplited =org.apache.commons.lang.StringUtils
                                .splitPreserveAllTokens(orderCategoryIds, ",");
                        for(String orderCategoryId : orderCategoryIdsSplited) {
                            list.add(new Tuple2(Long.valueOf(orderCategoryId),
                                    Long.valueOf(orderCategoryId)));
                        }
                    }

                    String payCategoryIds = row.getString(10);
                    if(payCategoryIds != null) {
                        String[] payCategoryIdsSplited =org.apache.commons.lang.StringUtils
                                .splitPreserveAllTokens(payCategoryIds, ",");  
                        for(String payCategoryId : payCategoryIdsSplited) {
                            list.add(new Tuple2(Long.valueOf(payCategoryId),
                                    Long.valueOf(payCategoryId)));
                        }
                    }

                    return list;
                }
            });
            //
2.2、计算出session访问过的所有品类的点击、下单和支付次数 2.2.1、统计点击品类的次数
  • 首先过滤出是点击的数据(clickCatagoryId != null)
  • mapToPair 映射成
  • 通过reduceByKey,计算每个品类的次数
            //2.1、过滤点击的session, 并统计品类的次数
            JavaPairRDD clickCatagory2CountRDD = filterSession2ActionRDD.filter(new Function() {

                @Override
                public Boolean call(Tuple2 v1) throws Exception {
                    Long clickCatagoryId = (Long)v1._2.get(6);
                    if(clickCatagoryId != null){
                        return true;
                    }else {
                        return false;
                    }
                }
            }).mapToPair(new PairFunction() {

                @Override
                public Tuple2 call(Tuple2 t)
                        throws Exception {
                    Long clickCatagoryId = t._2.getLong(6);

                    return new Tuple2(clickCatagoryId, 1L);
                }
            }).reduceByKey(new Function2() {

                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1+v2;
                }
            });
2.2.2、统计下订单的品类的次数
  • 首先过滤出是下订单的数据(orderCatagoyIds!= null)
  • 这个地方要注意, orderCatagroyids 包含多个品类,以逗号分割, 所以我们需要对orderCatagroyIds进行分割, 最后输出又必须是, 所以和统计点击的品类次数,有所区别, 使用的是flatMapToPair
  • 通过reduceByKey,计算每个品类的次数

这里写图片描述

2.2.3、统计支付的品类次数, 与2.2.2逻辑一致。 2.3、第三步: join各品类与它的点击, 下单, 支付的次数。 2.3.1、将2.1中获取所有品类,与点击次数进行join, 获取每个品类对应(点击,下单,支付)次数, 输出

注意次数使用leftOutJoin,某个品类不一定有点击, 但是该品类信息需要保留, 他可能直接下单支付, 如果使用join, 该品类将会抛弃。

        JavaPairRDD joinClickRDD = 
                allCatagoryIdRDD.leftOuterJoin(clickCatagory2CountRDD)
                .mapToPair(new PairFunction() {

                    @Override
                    public Tuple2 call(
                            Tuple2 t)
                                    throws Exception {
                        Long catagroyId = t._1;
                        Long count = t._2._2.isPresent()?t._2._2.get():0L;

                        String value = Constants.FIELD_CATEGORY_ID + "=" + catagroyId + "|" + 
                                Constants.FIELD_CLICK_COUNT + "=" + count;

                        //
                        return new Tuple2(catagroyId, value);
                    }
                });
下单, 支付和点击逻辑一致

        JavaPairRDD joinOrderRDD = joinClickRDD.leftOuterJoin(orderCatagory2CountRDD).mapToPair(new PairFunction() {

            @Override
            public Tuple2 call(
                    Tuple2 t)
                            throws Exception {
                Long catagroyId = t._1;
                String value = t._2._1;
                Long count = t._2._2.isPresent()?t._2._2.get():0L;

                value = value + "|" + Constants.FIELD_ORDER_COUNT + "=" + count;  
                 //
                return new Tuple2(catagroyId, value);
            }
        });

        JavaPairRDD joinRDD = joinOrderRDD.leftOuterJoin(payCatagory2CountRDD).mapToPair(new PairFunction() {

            @Override
            public Tuple2 call(
                    Tuple2 t)
                            throws Exception {
                Long catagroyId = t._1;
                String value = t._2._1;
                Long count = t._2._2.isPresent()?t._2._2.get():0L;

                value = value + "|" + Constants.FIELD_PAY_COUNT + "=" + count;  
                 //
                return new Tuple2(catagroyId, value);
            }
        });
输出结果, 有问题:
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(19,categoryid=19|clickCount=3|orderCount=1|payCount=0)
排查原因: 在统计所有品类的时候, 将点击,下单,支付都算进去了, 但是,没有考虑到categoryId重复的情况, 导致结果重复。所以需要对2.1统计所有品类结果进行去重
        //
        printRDD(allCatagoryIdRDD);
        //去重
        JavaPairRDD distinctCategoryIdRDD = allCatagoryIdRDD.distinct();

        return distinctCategoryIdRDD;
2.4、自定义二次排序key参考Spark的高级排序(二次排序)
package com.chb.shopanalysis.model;

import java.io.Serializable;

import scala.math.Ordered;

/**
 * @Description

* 自定义二次排序的key * 继承scala.math.Ordered, * Serializable 接口 *

* * @date 2018年7月12日 下午11:33:47 * * @author chb * * @mail 1228532445@qq.com */ public class CategorySortKey implements Ordered, Serializable{ private long clickCount; private long orderCount; private long payCount; @Override public boolean $greater(CategorySortKey other) { if(clickCount > other.getClickCount()) { return true; } else if(clickCount == other.getClickCount() && orderCount > other.getOrderCount()) { return true; } else if(clickCount == other.getClickCount() && orderCount == other.getOrderCount() && payCount > other.getPayCount()) { return true; } return false; } @Override public boolean $greater$eq(CategorySortKey other) { if($greater(other)) { return true; } else if(clickCount == other.getClickCount() && orderCount == other.getOrderCount() && payCount == other.getPayCount()) { return true; } return false; } @Override public boolean $less(CategorySortKey other) { if(clickCount < other.getClickCount()) { return true; } else if(clickCount == other.getClickCount() && orderCount < other.getOrderCount()) { return true; } else if(clickCount == other.getClickCount() && orderCount == other.getOrderCount() && payCount < other.getPayCount()) { return true; } return false; } @Override public boolean $less$eq(CategorySortKey other) { if($less(other)) { return true; } else if(clickCount == other.getClickCount() && orderCount == other.getOrderCount() && payCount == other.getPayCount()) { return true; } return false; } @Override public int compare(CategorySortKey other) { if(clickCount - other.getClickCount() != 0) { return (int) (clickCount - other.getClickCount()); } else if(orderCount - other.getOrderCount() != 0) { return (int) (orderCount - other.getOrderCount()); } else if(payCount - other.getPayCount() != 0) { return (int) (payCount - other.getPayCount()); } return 0; } @Override public int compareTo(CategorySortKey other) { if(clickCount - other.getClickCount() != 0) { return (int) (clickCount - other.getClickCount()); } else if(orderCount - other.getOrderCount() != 0) { return (int) (orderCount - other.getOrderCount()); } else if(payCount - other.getPayCount() != 0) { return (int) (payCount - other.getPayCount()); } return 0; }
2.5、进行二次排序 2.5.1首先映射成
    //首先映射成
        JavaPairRDD sortKey2CountInfoRDD = joinRDD.mapToPair(new PairFunction() {

            @Override
            public Tuple2 call(Tuple2 t)
                    throws Exception {
                String countInfo = t._2;

                //解析出clickCount, orderCount, payCount
                long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
                        countInfo, "\\|", Constants.FIELD_CLICK_COUNT));  
                long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
                        countInfo, "\\|", Constants.FIELD_ORDER_COUNT));  
                long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
                        countInfo, "\\|", Constants.FIELD_PAY_COUNT));  

                CategorySortKey sortKey = new CategorySortKey(clickCount,
                        orderCount, payCount);
                return new Tuple2(sortKey, countInfo);
            }
        });
2.5.2、通过sortByKey进行排序, false 按照降序
        //通过sortByKey进行排序, false 按照降序
        JavaPairRDD sortRDD = sortKey2CountInfoRDD.sortByKey(false);
        printRDD(sortRDD);
排序结果:

这里写图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.3181s