您当前的位置: 首页 >  hadoop

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

hadoop2.5.2学习12-MR之PageRank01

宝哥大数据 发布时间:2017-02-28 16:14:01 ,浏览量:0

一、PageRank之Mapper的代码实现

原始数据为: 这里写图片描述 因为PageRank为每个页面设置默认的PR值为1.0, 所以这个原始数据实际应如下, 中间以\t分割

A   1.0  B  D
B   1.0  C
C   1.0  A  B
D   1.0  B  C

由于设置Mapper的输出key-value为Text, Text。所以设置

extends Mapper

这个决定map的输入key-value为每个数据以第一个\t分割,左侧为key, 右侧为value 例如:

A   B   D

即节点A为key, 1.0(默认PR值) B D为value, 而这个1.0(默认PR值) B D就根据Node的 fromMR(String)将这个信息转为Node对象, 主要要区分第一次的map输入和迭代多次的输入

        //第一轮计算
        if (runCount == 1) {
            sourceNode = Node.fromMR("1.0"+"\t"+value.toString());
        }else {
            sourceNode = Node.fromMR(value.toString());
        }
最重要的是一点,由于要计算收敛,不可能一直递归下, 要设置一个阈值,只要新旧PR值的差值的和,达到这个阈值,就判定为收敛。

我们要每轮MapReduce在本例中reduce分四个分组,我们要统计每个分组的差值,

要用一个容器来记录每个分组的PR值的变动,这个容器必须是全局变量,持续到这个job流程。

在hadoop中的mapreduce中提供一个Counter,正好提供了这个功能。 通过context获取Counter并记录PR差值。context.getCounter(MyRunJob.MyCounter.countName).increment(j); Counter在下一篇文章介绍。hadoop2.5.2学习12-MR之PageRank02

        //因为incerment(long ), 所以要保证数据正确性, 所以乘以1000.0
        int j = (int) (d*1000.0);
        j = Math.abs(j);
        System.out.println(j);
        context.getCounter(MyRunJob.MyCounter.countName).increment(j);

在每个job处理完,要将这个PR差值的记录与我们设定的阈值对比

        if (f) {//一轮job执行完成
                //每次的执行结果,对比新旧的PR是否达到了收敛
                long sum = job.getCounters().findCounter(MyCounter.countName).getValue();
                System.out.println(sum);
                /**
                 * 因为每次的reducer中按照key进行四次计算, 每次计算增加一次j
                 * reducer:
                 *      context.getCounter(MyRunJob.MyCounter.countName).increment(j);
                 * 所以求取平均差值。要除以4*1000.0
                 *      在reducer中:int j = (int) (d*1000.0);
                 */
                double avgD = sum/4000.0;
                if (avgD < d) {//达到了收敛
                    break;
                }
                i++;
            }
递归计算

PageRank的最重要是通过递归迭代计算,是PR值达到收敛,每次计算的PR值,都作为下一次的计算的输入。

while (true) {//递归计算
    //job执行。。
}

为了不无限迭代,设置一个阈值,用于迭代的结束。

map的具体实现
package com.chb.pagerank;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyPageRankMapper extends Mapper{
    @Override
    protected void map(Text key, Text value, Context context) throws IOException,
            InterruptedException { 
        String page = key.toString();
        Node sourceNode = null;
        /**
         * 由于PageRank需要进行迭代计算,达到收敛
         * 第一次统计的原始数据为A B D 
         * PageRank设置页面的初始PR值为1.0   原始数据应为A   1.0   B   D
         * 进过第一轮mapreduce,计算出新的PR值  
         * 生成数据为:  A   new PR   B   D
         * 同时输出A对出链节点贡献的PR值
         * B  0.5
         * D  0.5
         */
        int runCount = context.getConfiguration().getInt("runCount", 1);
        //第一轮计算
        if (runCount == 1) {
            sourceNode = Node.fromMR("1.0"+"\t"+value.toString());
        }else {
            sourceNode = Node.fromMR(value.toString());
        }
        //输出sourceNode的PR值和出链的节点
        context.write(key, new Text(sourceNode.toString()));

        //输出sourceNode出链节点的对应的PR值
        if (sourceNode.containsAdjacentNodes()) {
            for (int i = 0; i < sourceNode.getAdjacentNodeNames().length; i++) {
                //sourceNode分给每个出链节点的PR值为sourceNode的PR值/出链节点数目
                double pr = sourceNode.getPageRank()/sourceNode.getAdjacentNodeNames().length;
                context.write(new Text(sourceNode.getAdjacentNodeNames()[i]), new Text(pr+""));
            }
        }
    }
}
reduce的具体实现
package com.chb.pagerank;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 *  
 */
public class MyPageRankReducer extends Reducer{
    @Override
    protected void reduce(Text key, Iterable values, Context context) 
            throws IOException,InterruptedException {
        double prSum = 0;
        Node sourceNode = null;
        for (Text text : values) {
            Node node = Node.fromMR(text.toString());
            if (node.containsAdjacentNodes()) {
                sourceNode = node;//旧的PR值
            }else {
                prSum += node.getPageRank();
            }
        }
        //计算出新的PR值
        double q = 0.85;//阻尼系数
        int N = 4;//总页面数
        double newPR = (1-q)/N + q*prSum;

        //对比新旧PR差值,直到数据收敛
        double d = newPR - sourceNode.getPageRank();
        //因为incerment(long ), 所以要保证数据正确性, 所以乘以1000.0
        int j = (int) (d*1000.0);
        j = Math.abs(j);
        System.out.println(j);
        context.getCounter(MyRunJob.MyCounter.countName).increment(j);

        //输出新的PR值,此次的输出作为下一轮map的输入
        sourceNode.setPageRank(newPR);
        context.write(key, new Text(sourceNode.toString()));
    }
}
执行Job的具体实现
package com.chb.pagerank;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyRunJob {
    public static enum MyCounter {
        countName;
    }
    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "chb");
        Configuration conf = new Configuration();
        int i =0;//记录统计的次数
        FileSystem fs = FileSystem.get(conf);
        //递归收敛的差值
        double d = 0.0001;
        while (true) {//递归计算
            Job job = Job.getInstance();
            job.setJar("");
            job.setJobName("pr"+"");
            job.setJarByClass(MyRunJob.class);

            job.setMapperClass(MyPageRankMapper.class);
            job.setReducerClass(MyPageRankReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //设置拆分
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            //第一轮输入
            FileInputFormat.addInputPath(job, new Path("/user/chb/input/pagerank.txt"));
            if (i > 0) {
                FileInputFormat.addInputPath(job, new Path("/user/chb/output/pr"+(i-1)));
            }
            Path  out = new Path("/user/chb/output/pr"+i);
            if (fs.exists(out)) {
                fs.delete(out, true);
            }
            FileOutputFormat.setOutputPath(job, out);
            boolean f = job.waitForCompletion(true);
            if (f) {//一轮job执行完成
                //每次的执行结果,对比新旧的PR是否达到了收敛
                long sum = job.getCounters().findCounter(MyCounter.countName).getValue();
                System.out.println(sum);
                /**
                 * 因为每次的reducer中按照key进行四次计算, 每次计算增加一次j
                 * reducer:
                 *      context.getCounter(MyRunJob.MyCounter.countName).increment(j);
                 * 所以求取平均差值。要除以4*1000.0
                 *      在reducer中:int j = (int) (d*1000.0);
                 */
                double avgD = sum/4000.0;
                if (avgD < d) {//达到了收敛
                    break;
                }
                i++;
            }
        }

    }
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0493s