您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

session聚合统计之自定义Accumulator

宝哥大数据 发布时间:2018-07-10 09:16:02 ,浏览量:1

如果不了解Accumulator 可以参考Broadcast变量&Accumulators

1.1、需求session聚合统计:

统计出来之前通过条件过滤的session,访问时长在0s~3s的session的数量,占总session数量的比例;4s~6s。。。。; 访问步长在1~3的session的数量,占总session数量的比例;4~6。。。;

1.2、使用Accumulator进行聚合统计
Accumulator 1s_3s = sc.accumulator(0L);
。。
。。
。。
十几个Accumulator

可以对过滤以后的session,调用foreach也可以,遍历所有session;计算每个session的访问时长和访问步长;
访问时长:把session的最后一个action的时间,减去第一个action的时间
访问步长:session的action数量
计算出访问时长和访问步长以后,根据对应的区间,找到对应的Accumulator,1s_3s.add(1L)
同时每遍历一个session,就可以给总session数量对应的Accumulator,加1
最后用各个区间的session数量,除以总session数量,就可以计算出各个区间的占比了
统计城市为city25的记录数
        //对过滤后的数据,进行聚合统计
        //创建一个累加器
        Accumulator accCity25 = UserAnalysis.sc.accumulator(0, "city25");
        filteredSessionid2AggrInfoRDD.foreach(new VoidFunction() {

            @Override
            public void call(Tuple2 t) throws Exception {
                if(t._2.contains("city25")){
                    accCity25.add(1);
                }
            }
        });
        System.out.println("city25的记录数:" + accCity25.value());
可以看到如果统计的指标过多的话, Accumulator的数量增加, 这个是不想看到的 这种传统的实现方式,有什么不好???

最大的不好,就是Accumulator太多了,不便于维护 首先第一,很有可能,在写后面的累加代码的时候,比如找到了一个4s~6s的区间的session,但是却代码里面不小心,累加到7s~9s里面去了; 第二,当后期,项目如果要出现一些逻辑上的变更,比如说,session数量的计算逻辑,要改变,就得更改所有Accumulator对应的代码;或者说,又要增加几个范围,那么又要增加多个Accumulator,并且修改对应的累加代码;维护成本,相当之高(甚至可能,修改一个小功能,或者增加一个小功能,耗费的时间,比做一个新项目还要多;甚至于,还修改出了bug,那就耗费更多的时间)

1.3、 改进:使用自定义的Accumulator

所以,我们这里的设计,不打算采用传统的方式,用十几个,甚至二十个Accumulator,因为维护成本太高 这里的实现思路是,我们自己自定义一个Accumulator,实现较为复杂的计算逻辑,一个Accumulator维护了所有范围区间的数量的统计逻辑 低耦合,如果说,session数量计算逻辑要改变,那么不用变更session遍历的相关的代码;只要维护一个Accumulator里面的代码即可; 如果计算逻辑后期变更,或者加了几个范围,那么也很方便,不用多加好几个Accumulator,去修改大量的代码;只要维护一个Accumulator里面的代码即可; 维护成本,大大降低

自定义Accumulator,也是Spark Core中,属于比较高端的一个技术 使用自定义Accumulator,大家就可以任意的实现自己的复杂分布式计算的逻辑 如果说,你的task,分布式,进行复杂计算逻辑,那么是很难实现的(借助于redis,维护中间状态,借助于zookeeper去实现分布式锁) 但是,使用自定义Accumulator,可以更方便进行中间状态的维护,而且不用担心并发和锁的问题

使用Scala实现自定义Accumulator

import org.apache.spark.AccumulatorParam
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext


/**
 * @author Administrator
 */
object SessionAggrStatAccumulatorTest {

  def main(args: Array[String]): Unit = {
    /**
     * Scala中,自定义Accumulator
     * 使用object,直接定义一个伴生对象即可
     * 需要实现AccumulatorParam接口,并使用[]语法,定义输入输出的数据格式
     */
    object SessionAggrStatAccumulator extends AccumulatorParam[String] {

      /**
       * 首先要实现zero方法
       * 负责返回一个初始值
       */
      def zero(initialValue: String): String = {
        Constants.SESSION_COUNT + "=0|" + Constants.TIME_PERIOD_1s_3s + "=0|" + Constants.TIME_PERIOD_4s_6s + "=0|" + Constants.TIME_PERIOD_7s_9s + "=0|" + Constants.TIME_PERIOD_10s_30s + "=0|" + Constants.TIME_PERIOD_30s_60s + "=0|" + Constants.TIME_PERIOD_1m_3m + "=0|" + Constants.TIME_PERIOD_3m_10m + "=0|" + Constants.TIME_PERIOD_10m_30m + "=0|" + Constants.TIME_PERIOD_30m + "=0|" + Constants.STEP_PERIOD_1_3 + "=0|" + Constants.STEP_PERIOD_4_6 + "=0|" + Constants.STEP_PERIOD_7_9 + "=0|" + Constants.STEP_PERIOD_10_30 + "=0|" + Constants.STEP_PERIOD_30_60 + "=0|" + Constants.STEP_PERIOD_60 + "=0"
      }

      /**
       * 其次需要实现一个累加方法
       */
      def addInPlace(v1: String, v2: String): String = {
        // 如果初始化值为空,那么返回v2
        if(v1 == "") {
          v2
        } else {
          // 从现有的连接串中提取v2对应的值
          val oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
          // 累加1
          val newValue = Integer.valueOf(oldValue) + 1
          // 给连接串中的v2设置新的累加后的值
          StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue))   
        }
      }

    }

    // 创建Spark上下文
    val conf = new SparkConf()
        .setAppName("SessionAggrStatAccumulatorTest")  
        .setMaster("local")  
    val sc = new SparkContext(conf);

    // 使用accumulator()()方法(curry),创建自定义的Accumulator
    val sessionAggrStatAccumulator = sc.accumulator("")(SessionAggrStatAccumulator)   

    // 模拟使用一把自定义的Accumulator
    val arr = Array(Constants.TIME_PERIOD_1s_3s, Constants.TIME_PERIOD_4s_6s)  
    val rdd = sc.parallelize(arr, 1)  
    rdd.foreach { sessionAggrStatAccumulator.add(_) }  

    println(sessionAggrStatAccumulator.value)  
  }

}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.1986s