一、需求:根据用户指定的日期范围,统计各个区域下的最热门的top3商品
1、区域信息在哪里,各个城市的信息,城市是不怎么变化的,没有必要存储在hive里?MySQL,Hive和MySQL异构数据源使用,技术点
2、hive用户行为数据,和mysql城市信息,join,关联之后是RDD?RDD转换DataFrame,注册临时表,技术点
3、各个区域下各个商品的点击量,保留每个区域的城市列表数据?自定义UDAF函数,group_concat_distinct()
4、product_id,join hive表中的商品信息,商品信息在哪里?Hive。商品的经营类型是什么?自定义UDF函数,get_json_object(),if()
5、获取每个区域的点击量top3商品?开窗函数;给每个区域打上级别的标识,西北大区,经济落后,区域上的划分,C类区域;北京、上海,发达,标记A类
6、Spark SQL的数据倾斜解决方案?双重group by、随机key以及扩容表(自定义UDF函数,random_key())、内置reduce join转换为map join、shuffle并行度
二、技术方案设计:
Spark作业接收taskid,查询对应的MySQL中的task,获取用户指定的筛选参数;统计出指定日期范围内的,各个区域的top3热门商品;最后将结果写入MySQL表中。
1、查询task,获取日期范围,通过Spark SQL,查询user_visit_action表中的指定日期范围内的数据,过滤出,商品点击行为,click_product_id is not null;click_product_id != 'NULL';click_product_id != 'null';city_id,click_product_id
2、使用Spark SQL从MySQL中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行join,city_id、city_name、area、product_id,RDD,转换成DataFrame,注册成一个临时表
3、Spark SQL内置函数(case when),对area打标记(华东大区,A级,华中大区,B级,东北大区,C级,西北大区,D级),area_level
4、计算出来每个区域下每个商品的点击次数,group by area, product_id;保留每个区域的城市名称列表;自定义UDAF,group_concat_distinct()函数,聚合出来一个city_names字段,area、product_id、city_names、click_count
5、join商品明细表,hive(product_id、product_name、extend_info),extend_info是json类型,自定义UDF,get_json_object()函数,取出其中的product_status字段,if()函数(Spark SQL内置函数),判断,0 自营,1 第三方;(area、product_id、city_names、click_count、product_name、product_status)
6、开窗函数,根据area来聚合,获取每个area下,click_count排名前3的product信息;area、area_level、product_id、city_names、click_count、product_name、product_status
7、结果写入MySQL表中
8、Spark SQL的数据倾斜解决方案?双重group by、随机key以及扩容表(自定义UDF函数,random_key())、Spark SQL内置的reduce join转换为map join、提高shuffle并行度
9、本地测试和生产环境的测试
三、基础数据的准备和设计
1、MySQL表中,要有city_info,city_id、city_name、area
2、Hive表中,要有一个product_info表,product_id、product_name、extend_info
3、MySQL中,设计结果表,task_id、area、area_level、product_id、city_names、click_count、product_name、product_status
四、实现
4.1、查询user_visit_action表中的指定日期范围内的数据,过滤出,商品点击行为,参考按session粒度进行数据聚合, 过滤源数据。
4.2、从mysql中获取城市信息, 通过DataFrame,
如何通过Spark SQL 从mysql中获取数据, 参考spark向mysql数据库读写数据
/**
* 使用Spark SQL从MySQL中查询城市信息
* @param sqlContext SQLContext
* @return
*/
private static JavaPairRDD getcityid2CityInfoRDD() {
// 构建MySQL连接配置信息(直接从配置文件中获取)
Properties connProps = new PropertiesUtils("/jdbc.properties").getProps();
String url = connProps.getProperty(Constants.JDBC_URL);
//设置数据库连接参数
//Properties props = new Properties();
Map props = new HashMap();
props.put("url", url);
props.put("user", connProps.getProperty(Constants.JDBC_USER));
props.put("password", connProps.getProperty(Constants.JDBC_PASSWORD));
props.put("dirver", connProps.getProperty(Constants.JDBC_DRIVER));
props.put("dbtable", "city_info");
// 通过SQLContext去从MySQL中查询数据
DataFrame cityInfoDF = sqlContext.read().format("jdbc").options(props).load();
//DataFrame cityInfoDF = sqlContext.read().jdbc(url, "city_info", props);
// 返回RDD
JavaRDD cityInfoRDD = cityInfoDF.javaRDD();
//格式化输出,以click_product_id为key
JavaPairRDD cityid2cityInfoRDD = cityInfoRDD.mapToPair(
new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Row row) throws Exception {
Integer cityid = row.getInt(0);
return new Tuple2(cityid, row);
}
});
return cityid2cityInfoRDD;
}
4.3、将点击信息与城市信息join, 然后存入hive中
数据插入hive中参考Spark通过Dataframe操作hive
4.3.1、join
//将clickActionRDD 与 cityInfo 进行join , 获取到关联信息
JavaPairRDD clickAction2CityInfoRDD =
clickActionRDD.join(cityInfoRDD);
4.3.2、将JavaPairRDD 转换为JavaRDD
, 是为了通过DataFrame插入数据库
//将JavaPairRDD 转换为JavaRDD, 是为了通过DataFrame插入数据库
JavaRDD joinInfoRowRDD = clickAction2CityInfoRDD.map(new Function() {
private static final long serialVersionUID = -6485564412605191186L;
@Override
public Row call(Tuple2 v1) throws Exception {
long cityid = v1._1;
Row clickActionInfo = v1._2._1;
Row cityInfo = v1._2._2;
Long productId = clickActionInfo.getLong(1);
String cityName = cityInfo.getString(1);
String area = cityInfo.getString(2);
Row row = RowFactory.create(cityid, cityName, area, productId);
return row;
}
});
4.3.3、创建schema
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("city_id", DataTypes.LongType, true));
structFields.add(DataTypes.createStructField("city_name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("area", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("product_id", DataTypes.LongType, true));
StructType schema = DataTypes.createStructType(structFields);
4.3.4、创建DataFrame
//创建DataFrame
DataFrame dataFrame = sqlContext.createDataFrame(joinInfoRowRDD, schema);
4.3.5、创建临时表, 并将数据导入hive实际表中
//创建临时表
dataFrame.registerTempTable("click_product_basic_tmp");
sqlContext.sql("insert into click_product_basic select * from click_product_basic_tmp");
数据的样式
/**
* 从hive表中读取数据, 使用自定义聚合函数
*/
private static void readProductClickInfo() {
// 按照area和product_id两个字段进行分组
// 计算出各区域各商品的点击次数
// 可以获取到每个area下的每个product_id的城市信息拼接起来的串
String sql = "SELECT area, product_id, count(*) click_count, "
+ "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos "
+ "FROM click_product_basic "
+ "GROUP BY area,product_id ";
// 使用Spark SQL执行这条SQL语句
DataFrame df = sqlContext.sql(sql);
// 各区域各商品的点击次数(以及额外的城市列表)
df.registerTempTable("tmp_area_product_click_count");
sqlContext.sql("truncate table area_product_click_count ");
sqlContext.sql("insert into area_product_click_count select * from tmp_area_product_click_count ");
}
0 product0 {"product_status": 0}
1 product1 {"product_status": 1}
2 product2 {"product_status": 1}
4.5.2、定义一个UDF获取json串信息
package com.chb.shopanalysis.sql.UDF;
import org.apache.spark.sql.api.java.UDF2;
import org.json.JSONObject;
/**
* get_json_object()
*
* 自定义UDF函数
*
* @author chb
*
*/
public class GetJsonObjectUDF implements UDF2 {
private static final long serialVersionUID = 1L;
@Override
public String call(String json, String field) throws Exception {
try {
JSONObject jsonObject = new JSONObject(json);
return String.valueOf(jsonObject.get(field));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
4.5.3、注册UDF
sqlContext.udf().register("get_json_object",
new GetJsonObjectUDF(),
DataTypes.StringType);
4.5.4、获取完整商品信息, 并打上是否自营
/**
* 生成区域商品点击次数临时表(包含了商品的完整信息)
* @param sqlContext
*/
private static void generateTempAreaFullProductClickCountTable() {
// 将之前得到的各区域各商品点击次数表,product_id
// 去关联商品信息表,product_id,product_name和product_status
// product_status要特殊处理,0,1,分别代表了自营和第三方的商品,放在了一个json串里面
// get_json_object()函数,可以从json串中获取指定的字段的值
// if()函数,判断,如果product_status是0,那么就是自营商品;如果是1,那么就是第三方商品
// area, product_id, click_count, city_infos, product_name, product_status
// 为什么要费时费力,计算出来商品经营类型
// 你拿到到了某个区域top3热门的商品,那么其实这个商品是自营的,还是第三方的
// 其实是很重要的一件事
String sql =
"SELECT "
+ "tapcc.area,"
+ "tapcc.product_id,"
+ "tapcc.click_count,"
+ "tapcc.city_infos,"
+ "pi.product_name,"
+ "if(get_json_object(pi.extend_info,'product_status')=0,'自营商品','第三方商品') product_status "
+ "FROM area_product_click_count tapcc "
+ "JOIN product_info pi ON tapcc.product_id=pi.product_id ";
DataFrame df = sqlContext.sql(sql);
df.show();
df.registerTempTable("tmp_area_fullprod_click_count");
sqlContext.sql("truncate table area_fullprod_click_count");
sqlContext.sql("insert into area_fullprod_click_count select * from tmp_area_fullprod_click_count ");
}
/**
* 获取各区域top3热门商品
* @param sqlContext
* @return
*/
private static JavaRDD getAreaTop3ProductRDD() {
// 使用开窗函数先进行一个子查询
// 按照area进行分组,给每个分组内的数据,按照点击次数降序排序,打上一个组内的行号
// 接着在外层查询中,过滤出各个组内的行号排名前3的数据
// 其实就是咱们的各个区域下top3热门商品
String sql = "SELECT area, product_id, click_count, city_infos, product_name, product_status "
+ "FROM ("
+ "SELECT area, product_id, click_count, city_infos, product_name, product_status, "
+ "ROW_NUMBER() OVER(PARTITION BY area ORDER BY click_count DESC) rank "
+ "FROM area_fullprod_click_count "
+ ") t "
+ "WHERE rank
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?