您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

各区域热门商品统计

宝哥大数据 发布时间:2018-07-17 13:11:41 ,浏览量:1

一、需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品
1、区域信息在哪里,各个城市的信息,城市是不怎么变化的,没有必要存储在hive里?MySQL,Hive和MySQL异构数据源使用,技术点
2、hive用户行为数据,和mysql城市信息,join,关联之后是RDD?RDD转换DataFrame,注册临时表,技术点
3、各个区域下各个商品的点击量,保留每个区域的城市列表数据?自定义UDAF函数,group_concat_distinct()
4、product_id,join hive表中的商品信息,商品信息在哪里?Hive。商品的经营类型是什么?自定义UDF函数,get_json_object(),if()
5、获取每个区域的点击量top3商品?开窗函数;给每个区域打上级别的标识,西北大区,经济落后,区域上的划分,C类区域;北京、上海,发达,标记A类
6、Spark SQL的数据倾斜解决方案?双重group by、随机key以及扩容表(自定义UDF函数,random_key())、内置reduce join转换为map join、shuffle并行度
二、技术方案设计:

Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。


1、查询task,获取日期范围,通过Spark SQL,查询user_visit_action表中的指定日期范围内的数据,过滤出,商品点击行为,click_product_id is not null;click_product_id != 'NULL';click_product_id != 'null';city_id,click_product_id
2、使用Spark SQL从MySQL中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行join,city_id、city_name、area、product_id,RDD,转换成DataFrame,注册成一个临时表
3、Spark SQL内置函数(case when),对area打标记(华东大区,A级,华中大区,B级,东北大区,C级,西北大区,D级),area_level
4、计算出来每个区域下每个商品的点击次数,group by area, product_id;保留每个区域的城市名称列表;自定义UDAF,group_concat_distinct()函数,聚合出来一个city_names字段,area、product_id、city_names、click_count
5、join商品明细表,hive(product_id、product_name、extend_info),extend_info是json类型,自定义UDF,get_json_object()函数,取出其中的product_status字段,if()函数(Spark SQL内置函数),判断,0 自营,1 第三方;(area、product_id、city_names、click_count、product_name、product_status)
6、开窗函数,根据area来聚合,获取每个area下,click_count排名前3的product信息;area、area_level、product_id、city_names、click_count、product_name、product_status
7、结果写入MySQL表中
8、Spark SQL的数据倾斜解决方案?双重group by、随机key以及扩容表(自定义UDF函数,random_key())、Spark SQL内置的reduce join转换为map join、提高shuffle并行度
9、本地测试和生产环境的测试
三、基础数据的准备和设计

1、MySQL表中,要有city_info,city_id、city_name、area
2、Hive表中,要有一个product_info表,product_id、product_name、extend_info
3、MySQL中,设计结果表,task_id、area、area_level、product_id、city_names、click_count、product_name、product_status
四、实现 4.1、查询user_visit_action表中的指定日期范围内的数据,过滤出,商品点击行为,参考按session粒度进行数据聚合, 过滤源数据。 4.2、从mysql中获取城市信息, 通过DataFrame,

如何通过Spark SQL 从mysql中获取数据, 参考spark向mysql数据库读写数据

    /**
     * 使用Spark SQL从MySQL中查询城市信息
     * @param sqlContext SQLContext
     * @return 
     */
    private static JavaPairRDD getcityid2CityInfoRDD() {
        // 构建MySQL连接配置信息(直接从配置文件中获取)
        Properties connProps = new PropertiesUtils("/jdbc.properties").getProps();
        String url = connProps.getProperty(Constants.JDBC_URL);

        //设置数据库连接参数
        //Properties props = new Properties();
        Map props = new HashMap();
        props.put("url", url);
        props.put("user", connProps.getProperty(Constants.JDBC_USER));
        props.put("password", connProps.getProperty(Constants.JDBC_PASSWORD));
        props.put("dirver", connProps.getProperty(Constants.JDBC_DRIVER));
        props.put("dbtable", "city_info");  


        // 通过SQLContext去从MySQL中查询数据
        DataFrame cityInfoDF = sqlContext.read().format("jdbc").options(props).load();

        //DataFrame cityInfoDF = sqlContext.read().jdbc(url, "city_info", props);

        // 返回RDD
        JavaRDD cityInfoRDD = cityInfoDF.javaRDD();


        //格式化输出,以click_product_id为key
        JavaPairRDD cityid2cityInfoRDD = cityInfoRDD.mapToPair(

                new PairFunction() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2 call(Row row) throws Exception {
                        Integer cityid = row.getInt(0);
                        return new Tuple2(cityid, row);
                    }

                });

        return cityid2cityInfoRDD;
    }
4.3、将点击信息与城市信息join, 然后存入hive中 数据插入hive中参考Spark通过Dataframe操作hive 4.3.1、join
//将clickActionRDD 与 cityInfo 进行join , 获取到关联信息
JavaPairRDD clickAction2CityInfoRDD = 
                clickActionRDD.join(cityInfoRDD);
4.3.2、将JavaPairRDD 转换为JavaRDD, 是为了通过DataFrame插入数据库
        //将JavaPairRDD 转换为JavaRDD, 是为了通过DataFrame插入数据库
        JavaRDD joinInfoRowRDD = clickAction2CityInfoRDD.map(new Function() {
            private static final long serialVersionUID = -6485564412605191186L;

            @Override
            public Row call(Tuple2 v1) throws Exception {
                long cityid = v1._1;
                Row clickActionInfo = v1._2._1;
                Row cityInfo = v1._2._2;

                Long productId = clickActionInfo.getLong(1);
                String cityName = cityInfo.getString(1);
                String area = cityInfo.getString(2);

                Row row = RowFactory.create(cityid, cityName, area, productId);

                return row;
            }
        });
4.3.3、创建schema
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("city_id", DataTypes.LongType, true));
        structFields.add(DataTypes.createStructField("city_name", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("area", DataTypes.StringType, true));
        structFields.add(DataTypes.createStructField("product_id", DataTypes.LongType, true));
        StructType schema = DataTypes.createStructType(structFields);
4.3.4、创建DataFrame
        //创建DataFrame
        DataFrame dataFrame = sqlContext.createDataFrame(joinInfoRowRDD, schema);
4.3.5、创建临时表, 并将数据导入hive实际表中
        //创建临时表
        dataFrame.registerTempTable("click_product_basic_tmp");
        sqlContext.sql("insert into click_product_basic select * from click_product_basic_tmp");
数据的样式

这里写图片描述

4.4、获取各区域各商品的点击次数 4.4.1、创建一个UDF,用于将city_id, city_name拼接一起, 组成cityinfo 4.4.2、创建一个UDAF, 用于将分组内的cityinfo 去重之后拼接到一起 4.4.3、使用自定一UDF, UDAF,统计各区域各商品的点击次数
    /**
     * 从hive表中读取数据, 使用自定义聚合函数
     */
    private static void readProductClickInfo() {
        // 按照area和product_id两个字段进行分组
        // 计算出各区域各商品的点击次数
        // 可以获取到每个area下的每个product_id的城市信息拼接起来的串

        String sql =  "SELECT  area, product_id, count(*) click_count, "  
                + "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos "  
                + "FROM click_product_basic "
                + "GROUP BY area,product_id "; 

        // 使用Spark SQL执行这条SQL语句
        DataFrame df = sqlContext.sql(sql);
        // 各区域各商品的点击次数(以及额外的城市列表)
        df.registerTempTable("tmp_area_product_click_count");    
        sqlContext.sql("truncate table area_product_click_count ");
        sqlContext.sql("insert into area_product_click_count select * from tmp_area_product_click_count ");
    }

这里写图片描述

4.5、关联商品信息, 并将打上商品是否自营的标识 4.5.1、商品信息, prouct_status 表示是否自营, 是一个json串, 我们在获取的时候需要进行转换,
0       product0        {"product_status": 0}
1       product1        {"product_status": 1}
2       product2        {"product_status": 1}
4.5.2、定义一个UDF获取json串信息
package com.chb.shopanalysis.sql.UDF;

import org.apache.spark.sql.api.java.UDF2;
import org.json.JSONObject;


/**
 * get_json_object()
 * 
 * 自定义UDF函数
 * 
 * @author chb
 *
 */
public class GetJsonObjectUDF implements UDF2 {

    private static final long serialVersionUID = 1L;

    @Override
    public String call(String json, String field) throws Exception {
        try {
            JSONObject jsonObject = new JSONObject(json);
            return String.valueOf(jsonObject.get(field));
        } catch (Exception e) {
            e.printStackTrace();  
        }
        return null;
    }

}
4.5.3、注册UDF
        sqlContext.udf().register("get_json_object", 
                new GetJsonObjectUDF(),
                DataTypes.StringType);
4.5.4、获取完整商品信息, 并打上是否自营
    /**
     * 生成区域商品点击次数临时表(包含了商品的完整信息)
     * @param sqlContext
     */
    private static void generateTempAreaFullProductClickCountTable() {
        // 将之前得到的各区域各商品点击次数表,product_id
        // 去关联商品信息表,product_id,product_name和product_status
        // product_status要特殊处理,0,1,分别代表了自营和第三方的商品,放在了一个json串里面
        // get_json_object()函数,可以从json串中获取指定的字段的值
        // if()函数,判断,如果product_status是0,那么就是自营商品;如果是1,那么就是第三方商品
        // area, product_id, click_count, city_infos, product_name, product_status

        // 为什么要费时费力,计算出来商品经营类型
        // 你拿到到了某个区域top3热门的商品,那么其实这个商品是自营的,还是第三方的
        // 其实是很重要的一件事

        String sql = 
                "SELECT "
                    + "tapcc.area,"
                    + "tapcc.product_id,"
                    + "tapcc.click_count,"
                    + "tapcc.city_infos,"
                    + "pi.product_name,"
                    + "if(get_json_object(pi.extend_info,'product_status')=0,'自营商品','第三方商品') product_status "
                + "FROM area_product_click_count tapcc "
                + "JOIN product_info pi ON tapcc.product_id=pi.product_id ";

        DataFrame df = sqlContext.sql(sql);
        df.show();
        df.registerTempTable("tmp_area_fullprod_click_count");   
        sqlContext.sql("truncate table area_fullprod_click_count");
        sqlContext.sql("insert into area_fullprod_click_count select * from tmp_area_fullprod_click_count ");
    }

这里写图片描述

4.6、 获取各区域top3热门商品, 使用开窗函数。
    /**
     * 获取各区域top3热门商品
     * @param sqlContext
     * @return
     */
    private static JavaRDD getAreaTop3ProductRDD() {
        // 使用开窗函数先进行一个子查询
        // 按照area进行分组,给每个分组内的数据,按照点击次数降序排序,打上一个组内的行号
        // 接着在外层查询中,过滤出各个组内的行号排名前3的数据
        // 其实就是咱们的各个区域下top3热门商品

        String sql = "SELECT  area, product_id, click_count, city_infos, product_name, product_status "
                + "FROM ("
                + "SELECT area, product_id, click_count, city_infos, product_name, product_status, " 
                + "ROW_NUMBER() OVER(PARTITION BY area ORDER BY click_count DESC) rank "
                + "FROM area_fullprod_click_count "
                + ") t "
                + "WHERE rank            
关注
打赏
1587549273
查看更多评论
0.0485s