计算出来通过筛选条件的那些session,他们访问过的所有品类(点击、下单、支付),按照各个品类的点击、下单和支付次数,降序排序,获取前10个品类,也就是筛选条件下的那一批session的top10热门品类;
点击、下单和支付次数:优先按照点击次数排序、如果点击次数相等,那么按照下单次数排序、如果下单次数相当,那么按照支付次数排序
这个需求是很有意义的,因为这样,就可以让数据分析师、产品经理、公司高层,随时随地都可以看到自己感兴趣的那一批用户,最喜欢的10个品类,从而对自己公司和产品的定位有清晰的了解,并且可以更加深入的了解自己的用户,更好的调整公司战略
二次排序:
如果我们就只是根据某一个字段进行排序,比如点击次数降序排序,那么就不是二次排序; 二次排序,顾名思义,就是说,不只是根据一个字段进行一次排序,可能是要根据多个字段,进行多次排序的 点击、下单和支付次数,依次进行排序,就是二次排序
sortByKey算子,默认情况下,它支持根据int、long等类型来进行排序,但是那样的话,key就只能放一个字段了 所以需要自定义key,作为sortByKey算子的key,自定义key中,封装n个字段,并在key中,自己在指定接口方法中,实现自己的根据多字段的排序算法 然后再使用sortByKey算子进行排序,那么就可以按照我们自己的key,使用多个字段进行排序
本模块中,最最重要和核心的一个Spark技术点
实现思路分析:
1、拿到通过筛选条件的那批session,访问过的所有品类 2、计算出session访问过的所有品类的点击、下单和支付次数,这里可能要跟第一步计算出来的品类进行join 3、自己开发二次排序的key 4、做映射,将品类的点击、下单和支付次数,封装到二次排序key中,作为PairRDD的key 5、使用sortByKey(false),按照自定义key,进行降序二次排序 6、使用take(10)获取,排序后的前10个品类,就是top10热门品类 7、将top10热门品类,以及每个品类的点击、下单和支付次数,写入MySQL数据库 8、使用Scala来开发二次排序key
二、具体实现 2.1、拿到通过筛选条件的那批session,访问过的所有品类(包含点击, 下订单, 支付的品类)
//获取session访问过的所有品类id
//包括 点击, 下单, 支付的品类
//输入
JavaPairRDD catagoryRDD = filterSession2ActionRDD.flatMapToPair(new PairFlatMapFunction() {
@Override
public Iterable call(Tuple2 t)
throws Exception {
Row row = t._2;
List list = new ArrayList();
Long clickCategoryId = (Long)row.get(6);
if(clickCategoryId != null) {
list.add(new Tuple2(clickCategoryId, clickCategoryId));
}
String orderCategoryIds = row.getString(8);
if(orderCategoryIds != null) {
String[] orderCategoryIdsSplited =org.apache.commons.lang.StringUtils
.splitPreserveAllTokens(orderCategoryIds, ",");
for(String orderCategoryId : orderCategoryIdsSplited) {
list.add(new Tuple2(Long.valueOf(orderCategoryId),
Long.valueOf(orderCategoryId)));
}
}
String payCategoryIds = row.getString(10);
if(payCategoryIds != null) {
String[] payCategoryIdsSplited =org.apache.commons.lang.StringUtils
.splitPreserveAllTokens(payCategoryIds, ",");
for(String payCategoryId : payCategoryIdsSplited) {
list.add(new Tuple2(Long.valueOf(payCategoryId),
Long.valueOf(payCategoryId)));
}
}
return list;
}
});
//
2.2、计算出session访问过的所有品类的点击、下单和支付次数
2.2.1、统计点击品类的次数
- 首先过滤出是点击的数据(clickCatagoryId != null)
- mapToPair 映射成
- 通过reduceByKey,计算每个品类的次数
//2.1、过滤点击的session, 并统计品类的次数
JavaPairRDD clickCatagory2CountRDD = filterSession2ActionRDD.filter(new Function() {
@Override
public Boolean call(Tuple2 v1) throws Exception {
Long clickCatagoryId = (Long)v1._2.get(6);
if(clickCatagoryId != null){
return true;
}else {
return false;
}
}
}).mapToPair(new PairFunction() {
@Override
public Tuple2 call(Tuple2 t)
throws Exception {
Long clickCatagoryId = t._2.getLong(6);
return new Tuple2(clickCatagoryId, 1L);
}
}).reduceByKey(new Function2() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1+v2;
}
});
2.2.2、统计下订单的品类的次数
- 首先过滤出是下订单的数据(orderCatagoyIds!= null)
- 这个地方要注意, orderCatagroyids 包含多个品类,以逗号分割, 所以我们需要对orderCatagroyIds进行分割, 最后输出又必须是
, 所以和统计点击的品类次数,有所区别, 使用的是flatMapToPair
- 通过reduceByKey,计算每个品类的次数
注意次数使用leftOutJoin,某个品类不一定有点击, 但是该品类信息需要保留, 他可能直接下单支付, 如果使用join, 该品类将会抛弃。
JavaPairRDD joinClickRDD =
allCatagoryIdRDD.leftOuterJoin(clickCatagory2CountRDD)
.mapToPair(new PairFunction() {
@Override
public Tuple2 call(
Tuple2 t)
throws Exception {
Long catagroyId = t._1;
Long count = t._2._2.isPresent()?t._2._2.get():0L;
String value = Constants.FIELD_CATEGORY_ID + "=" + catagroyId + "|" +
Constants.FIELD_CLICK_COUNT + "=" + count;
//
return new Tuple2(catagroyId, value);
}
});
下单, 支付和点击逻辑一致
JavaPairRDD joinOrderRDD = joinClickRDD.leftOuterJoin(orderCatagory2CountRDD).mapToPair(new PairFunction() {
@Override
public Tuple2 call(
Tuple2 t)
throws Exception {
Long catagroyId = t._1;
String value = t._2._1;
Long count = t._2._2.isPresent()?t._2._2.get():0L;
value = value + "|" + Constants.FIELD_ORDER_COUNT + "=" + count;
//
return new Tuple2(catagroyId, value);
}
});
JavaPairRDD joinRDD = joinOrderRDD.leftOuterJoin(payCatagory2CountRDD).mapToPair(new PairFunction() {
@Override
public Tuple2 call(
Tuple2 t)
throws Exception {
Long catagroyId = t._1;
String value = t._2._1;
Long count = t._2._2.isPresent()?t._2._2.get():0L;
value = value + "|" + Constants.FIELD_PAY_COUNT + "=" + count;
//
return new Tuple2(catagroyId, value);
}
});
输出结果, 有问题:
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(39,categoryid=39|clickCount=4|orderCount=2|payCount=3)
(19,categoryid=19|clickCount=3|orderCount=1|payCount=0)
排查原因: 在统计所有品类的时候, 将点击,下单,支付都算进去了, 但是,没有考虑到categoryId重复的情况, 导致结果重复。所以需要对2.1统计所有品类结果进行去重
//
printRDD(allCatagoryIdRDD);
//去重
JavaPairRDD distinctCategoryIdRDD = allCatagoryIdRDD.distinct();
return distinctCategoryIdRDD;
2.4、自定义二次排序key参考Spark的高级排序(二次排序)
package com.chb.shopanalysis.model;
import java.io.Serializable;
import scala.math.Ordered;
/**
* @Description
* 自定义二次排序的key
* 继承scala.math.Ordered,
* Serializable 接口
*
*
* @date 2018年7月12日 下午11:33:47
*
* @author chb
*
* @mail 1228532445@qq.com
*/
public class CategorySortKey implements Ordered, Serializable{
private long clickCount;
private long orderCount;
private long payCount;
@Override
public boolean $greater(CategorySortKey other) {
if(clickCount > other.getClickCount()) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount > other.getOrderCount()) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount > other.getPayCount()) {
return true;
}
return false;
}
@Override
public boolean $greater$eq(CategorySortKey other) {
if($greater(other)) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount == other.getPayCount()) {
return true;
}
return false;
}
@Override
public boolean $less(CategorySortKey other) {
if(clickCount < other.getClickCount()) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount < other.getOrderCount()) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount < other.getPayCount()) {
return true;
}
return false;
}
@Override
public boolean $less$eq(CategorySortKey other) {
if($less(other)) {
return true;
} else if(clickCount == other.getClickCount() &&
orderCount == other.getOrderCount() &&
payCount == other.getPayCount()) {
return true;
}
return false;
}
@Override
public int compare(CategorySortKey other) {
if(clickCount - other.getClickCount() != 0) {
return (int) (clickCount - other.getClickCount());
} else if(orderCount - other.getOrderCount() != 0) {
return (int) (orderCount - other.getOrderCount());
} else if(payCount - other.getPayCount() != 0) {
return (int) (payCount - other.getPayCount());
}
return 0;
}
@Override
public int compareTo(CategorySortKey other) {
if(clickCount - other.getClickCount() != 0) {
return (int) (clickCount - other.getClickCount());
} else if(orderCount - other.getOrderCount() != 0) {
return (int) (orderCount - other.getOrderCount());
} else if(payCount - other.getPayCount() != 0) {
return (int) (payCount - other.getPayCount());
}
return 0;
}
2.5、进行二次排序
2.5.1首先映射成
//首先映射成
JavaPairRDD sortKey2CountInfoRDD = joinRDD.mapToPair(new PairFunction() {
@Override
public Tuple2 call(Tuple2 t)
throws Exception {
String countInfo = t._2;
//解析出clickCount, orderCount, payCount
long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.FIELD_CLICK_COUNT));
long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.FIELD_ORDER_COUNT));
long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.FIELD_PAY_COUNT));
CategorySortKey sortKey = new CategorySortKey(clickCount,
orderCount, payCount);
return new Tuple2(sortKey, countInfo);
}
});
2.5.2、通过sortByKey进行排序, false 按照降序
//通过sortByKey进行排序, false 按照降序
JavaPairRDD sortRDD = sortKey2CountInfoRDD.sortByKey(false);
printRDD(sortRDD);
排序结果: