Counter
在上一篇文章中为了记录每个reduce中的每组PR差值,我们需要一个全局变量来记录这些差值。 hadoop为我们提供了Counter接口
Counter是一个接口: org.apache.hadoop.mapreduce.Counter extends Writable Counter跟踪Map-Reduce的进程 Counters表示全局的counters, 可以被任何的Map-Reduce框架或应用定义,
- a 每个Counter都有一个Enum命名,
- b 每个Counter都是存储一个long类型的值。
在上篇文章中统计PR,使用了Counter来统计新旧PR值的差值, 首先定义一个Enum,用来标记Counter, 因为 a 决定要定义一个Enum.
public static enum MyCounter {
countName;
}
因为要满足b, 而PR差值是double类型的数值,所以将PR值*1000.0 通过Enum获指定的Counter, 调用Counter的increment(long)来记录这个PR差值。
//因为incerment(long ), 所以要保证数据正确性, 所以乘以1000.0
int j = (int) (d*1000.0);
j = Math.abs(j);
System.out.println(j);
context.getCounter(MyRunJob.MyCounter.countName).increment(j);
在每次job执行完成,将Counter中的值提取出来
long sum = job.getCounters().findCounter(MyCounter.countName).getValue();
因为默认分组是按照key进行分组, 而reduce的key是节点的字符,所以每个分组被reduce处理,都会产生一个PR差值,所以最终的Counter中记录的数据是所有分组的差值总和,所以在job执行完,要进行和阈值对比,要将Counter的值/4000.0,用来求去平均的差值
/**
* 因为每次的reducer中按照key进行四次计算, 每次计算增加一次j
* reducer:
* context.getCounter(MyRunJob.MyCounter.countName).increment(j);
* 所以求取平均差值。要除以4*1000.0
* 在reducer中:int j = (int) (d*1000.0);
*/
double avgD = sum/4000.0;
平均差值小于阈值,说明数据收敛,那么久停止迭代。
if (avgD < d) {//达到了收敛
break;
}
暂时记录于此