一、PageRank之Mapper的代码实现
原始数据为: 因为PageRank为每个页面设置默认的PR值为1.0, 所以这个原始数据实际应如下, 中间以\t分割
A 1.0 B D
B 1.0 C
C 1.0 A B
D 1.0 B C
由于设置Mapper的输出key-value为Text, Text。所以设置
extends Mapper
这个决定map的输入key-value为每个数据以第一个\t
分割,左侧为key, 右侧为value 例如:
A B D
即节点A为key, 1.0(默认PR值) B D为value, 而这个1.0(默认PR值) B D就根据Node的 fromMR(String)将这个信息转为Node对象, 主要要区分第一次的map输入和迭代多次的输入
//第一轮计算
if (runCount == 1) {
sourceNode = Node.fromMR("1.0"+"\t"+value.toString());
}else {
sourceNode = Node.fromMR(value.toString());
}
最重要的是一点,由于要计算收敛,不可能一直递归下, 要设置一个阈值,只要新旧PR值的差值的和,达到这个阈值,就判定为收敛。
我们要每轮MapReduce在本例中reduce分四个分组,我们要统计每个分组的差值,
要用一个容器来记录每个分组的PR值的变动,这个容器必须是全局变量,持续到这个job流程。在hadoop中的mapreduce中提供一个Counter,正好提供了这个功能。 通过context获取Counter并记录PR差值。context.getCounter(MyRunJob.MyCounter.countName).increment(j);
Counter在下一篇文章介绍。hadoop2.5.2学习12-MR之PageRank02
//因为incerment(long ), 所以要保证数据正确性, 所以乘以1000.0
int j = (int) (d*1000.0);
j = Math.abs(j);
System.out.println(j);
context.getCounter(MyRunJob.MyCounter.countName).increment(j);
在每个job处理完,要将这个PR差值的记录与我们设定的阈值对比
if (f) {//一轮job执行完成
//每次的执行结果,对比新旧的PR是否达到了收敛
long sum = job.getCounters().findCounter(MyCounter.countName).getValue();
System.out.println(sum);
/**
* 因为每次的reducer中按照key进行四次计算, 每次计算增加一次j
* reducer:
* context.getCounter(MyRunJob.MyCounter.countName).increment(j);
* 所以求取平均差值。要除以4*1000.0
* 在reducer中:int j = (int) (d*1000.0);
*/
double avgD = sum/4000.0;
if (avgD < d) {//达到了收敛
break;
}
i++;
}
递归计算
PageRank的最重要是通过递归迭代计算,是PR值达到收敛,每次计算的PR值,都作为下一次的计算的输入。
while (true) {//递归计算
//job执行。。
}
为了不无限迭代,设置一个阈值,用于迭代的结束。
map的具体实现package com.chb.pagerank;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyPageRankMapper extends Mapper{
@Override
protected void map(Text key, Text value, Context context) throws IOException,
InterruptedException {
String page = key.toString();
Node sourceNode = null;
/**
* 由于PageRank需要进行迭代计算,达到收敛
* 第一次统计的原始数据为A B D
* PageRank设置页面的初始PR值为1.0 原始数据应为A 1.0 B D
* 进过第一轮mapreduce,计算出新的PR值
* 生成数据为: A new PR B D
* 同时输出A对出链节点贡献的PR值
* B 0.5
* D 0.5
*/
int runCount = context.getConfiguration().getInt("runCount", 1);
//第一轮计算
if (runCount == 1) {
sourceNode = Node.fromMR("1.0"+"\t"+value.toString());
}else {
sourceNode = Node.fromMR(value.toString());
}
//输出sourceNode的PR值和出链的节点
context.write(key, new Text(sourceNode.toString()));
//输出sourceNode出链节点的对应的PR值
if (sourceNode.containsAdjacentNodes()) {
for (int i = 0; i < sourceNode.getAdjacentNodeNames().length; i++) {
//sourceNode分给每个出链节点的PR值为sourceNode的PR值/出链节点数目
double pr = sourceNode.getPageRank()/sourceNode.getAdjacentNodeNames().length;
context.write(new Text(sourceNode.getAdjacentNodeNames()[i]), new Text(pr+""));
}
}
}
}
reduce的具体实现
package com.chb.pagerank;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
*/
public class MyPageRankReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException,InterruptedException {
double prSum = 0;
Node sourceNode = null;
for (Text text : values) {
Node node = Node.fromMR(text.toString());
if (node.containsAdjacentNodes()) {
sourceNode = node;//旧的PR值
}else {
prSum += node.getPageRank();
}
}
//计算出新的PR值
double q = 0.85;//阻尼系数
int N = 4;//总页面数
double newPR = (1-q)/N + q*prSum;
//对比新旧PR差值,直到数据收敛
double d = newPR - sourceNode.getPageRank();
//因为incerment(long ), 所以要保证数据正确性, 所以乘以1000.0
int j = (int) (d*1000.0);
j = Math.abs(j);
System.out.println(j);
context.getCounter(MyRunJob.MyCounter.countName).increment(j);
//输出新的PR值,此次的输出作为下一轮map的输入
sourceNode.setPageRank(newPR);
context.write(key, new Text(sourceNode.toString()));
}
}
执行Job的具体实现
package com.chb.pagerank;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyRunJob {
public static enum MyCounter {
countName;
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "chb");
Configuration conf = new Configuration();
int i =0;//记录统计的次数
FileSystem fs = FileSystem.get(conf);
//递归收敛的差值
double d = 0.0001;
while (true) {//递归计算
Job job = Job.getInstance();
job.setJar("");
job.setJobName("pr"+"");
job.setJarByClass(MyRunJob.class);
job.setMapperClass(MyPageRankMapper.class);
job.setReducerClass(MyPageRankReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置拆分
job.setInputFormatClass(KeyValueTextInputFormat.class);
//第一轮输入
FileInputFormat.addInputPath(job, new Path("/user/chb/input/pagerank.txt"));
if (i > 0) {
FileInputFormat.addInputPath(job, new Path("/user/chb/output/pr"+(i-1)));
}
Path out = new Path("/user/chb/output/pr"+i);
if (fs.exists(out)) {
fs.delete(out, true);
}
FileOutputFormat.setOutputPath(job, out);
boolean f = job.waitForCompletion(true);
if (f) {//一轮job执行完成
//每次的执行结果,对比新旧的PR是否达到了收敛
long sum = job.getCounters().findCounter(MyCounter.countName).getValue();
System.out.println(sum);
/**
* 因为每次的reducer中按照key进行四次计算, 每次计算增加一次j
* reducer:
* context.getCounter(MyRunJob.MyCounter.countName).increment(j);
* 所以求取平均差值。要除以4*1000.0
* 在reducer中:int j = (int) (d*1000.0);
*/
double avgD = sum/4000.0;
if (avgD < d) {//达到了收敛
break;
}
i++;
}
}
}
}