一、需求: 获取top10品类的点击次数最多的10个session
top10热门品类, 获取每个品类点击次数最多的10个session, 以及对应的访问明细
二、实现思路分析: 2.1、拿到符合筛选条件的session的明细数据, 提取成公共的RDD /**
* 通过筛选条件的RDD与明细数据进行聚合
* 获取通过筛选条件的session的访问明细数据RDD
* @param filteredSessionid2AggrInfoRDD
* @param sessionid2actionRDD
*/
private static JavaPairRDD getSessionId2DetailRDD(
JavaPairRDD filteredSessionid2AggrInfoRDD,
JavaPairRDD sessionid2actionRDD){
//获取过滤后的明细数据
JavaPairRDD filterSession2DetailRDD = filteredSessionid2AggrInfoRDD
.join(sessionid2actionRDD) //
.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(
Tuple2 t) throws Exception {
return new Tuple2(t._1, t._2._2);
}
});
return filterSession2DetailRDD;
}
2.2、按照session粒度进行聚合,获取到session对每个品类的点击次数,用flatMap,算子函数返回的是
, 然后和top10品类进行聚合,获取基于品类的session 的点击数
- 2.2.1首先在top10热门品类之需求以及实现思路分析 中已经获取到top10热门品类, 提取top10热门品类id, 转化为PairRDD, 为了后续的join
/**
* 第一步: 提取top10热门品类id,
*/
List top10CategoryIdList = new ArrayList();
for (Tuple2 tuple2 : top10CategoryList) {
Long categoryId = Long.valueOf(
StringUtils.getFieldFromConcatString(tuple2._2, "\\|", Constants.FIELD_CATEGORY_ID));
top10CategoryIdList.add(new Tuple2(categoryId, categoryId));
}
//转化为PairRDD, 为了后续的join
JavaPairRDD top10CategoryIdListRDD = sc.parallelizePairs(top10CategoryIdList);
- 2.2.2、计算top10品类 被各个session 点击的次数
- 2.2.2.1、通过groupByKey对
2.1
中获取到通过筛选的session明细数据进行分组 - 2.2.2.2、因为是
, 所以通过flatMapToPair对明细数据进行遍历, 统计每个sessionid, 对应categroyId的点击数, 但是, 我们最终目的是统计基于热门品类的 点击数top10的session。所以需要将key转化为categroyId,
- 输出为
形式
- 2.2.2.1、通过groupByKey对
/**
* 第二步: 计算top10品类 被各个session 点击的次数
*/
//分组
JavaPairRDD sessionId2DetailsRDD = sessionId2DetailRDD.groupByKey();
//
JavaPairRDD categoryId2SessionCountRDD = sessionId2DetailsRDD.flatMapToPair(new PairFlatMapFunction() {
@Override
public Iterable call(
Tuple2 t) throws Exception {
String sessoionId = t._1;
Iterator details = t._2.iterator();
/** */
Map categoryCountMap = new HashMap();
//首先获取每个session, 对应categroyId的 clickCount
while(details.hasNext()){
Row row = details.next();
if(row.get(6) != null) {
Long categoryId = row.getLong(6);
Long count = categoryCountMap.get(categoryId);
if(count == null) count=0L;
count++;
categoryCountMap.put(categoryId, count);
}
}
//转换为categoryId为key,
//将每个session 对应品类的点击数量
//转化为 每个品类 对应session的点击数量 这是为了后面进行join,
//输出
List list = new ArrayList();
for ( Entry categoryCountEntry : categoryCountMap.entrySet()) {
Long categoryId = categoryCountEntry.getKey();
Long count = categoryCountEntry.getValue();
String value = sessoionId + "," + count;
list.add(new Tuple2(categoryId, value));
}
return list;
}
});
- 2.2.2.3根据categoryId将top10品类RDD和基于品类的session点击数RDD进行join, 就是基于品类top10的每个session的点击数。
/**
* 根据categoryId进行join
*
* 就获取了top10热门品类 被各个session点击的次数
*/
JavaPairRDD joinRDD =
top10CategoryIdListRDD.join(categoryId2SessionCountRDD)
.mapToPair(new PairFunction() {
@Override
public Tuple2 call(
Tuple2 t)
throws Exception {
//格式化输出
return new Tuple2(t._1, t._2._2);
}
});
2.3、按照品类id,分组取top10,获取到top10活跃session;
- 2.3.1、groupByKey 进行分组
JavaPairRDD joinRDDGroupBycategrouyID = joinRDD.groupByKey();
- 2.3.2、自己写算法,获取到点击次数最多的前10个session,直接写入MySQL表;返回的是sessionid
4、获取各品类top10活跃session的访问明细数据