针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
即对6.2中聚合后的数据,按照客户的刷选条件进行过滤。聚合数据
(ce6983270b154f31a7ca67b56f6abcb3,sessionid=ce6983270b154f31a7ca67b56f6abcb3|searchKeywords=温泉|clickCategoryIds=51,79,80|age=2|professional=professional23|city=city25|sex=male)
(d489a32c190e4dd1a7e0fd29fb92c495,sessionid=d489a32c190e4dd1a7e0fd29fb92c495|searchKeywords=新辣道鱼火锅,温泉,蛋糕|clickCategoryIds=17,51,39,91,46,8|age=2|professional=professional23|city=city25|sex=male)
(3783c8f44c3b465884f64a4923368261,sessionid=3783c8f44c3b465884f64a4923368261|searchKeywords=火锅,温泉|clickCategoryIds=90|age=2|professional=professional23|city=city25|sex=male)
通过使用Fliter算子进行过滤
/**
* 按筛选参数对session粒度聚合数据进行过滤
* @param sessionid2AggrInfoRDD 聚合后的数据
* @param taskParam 过滤的条件
* @return
*/
private static JavaPairRDD filterSession(
JavaPairRDD sessionid2AggrInfoRDD,
JSONObject taskParam) {
//从数据库中获取过滤的参数
String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);
String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
String searchKeys = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
System.out.println(startAge+"------"+endAge+"----"+searchKeys);
JavaPairRDD filterRDD = sessionid2AggrInfoRDD.filter(new Function() {
@Override
public Boolean call(Tuple2 tuple) throws Exception {
// 首先,从tuple中,获取聚合数据
String aggrInfo = tuple._2;
Boolean isFilter = false;
//1、按照年龄范围进行过滤(startAge, endAge)
int age = Integer.valueOf(
StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_AGE));
if(startAge != null && endAge != null){
if(age>=Integer.valueOf(startAge) && age
关注
打赏