通过好友列表,获取共同好友关系, 进行好友推荐 在QQ等交友平台,总会出现你和某某有多少个共同好友,QQ会向你推荐好 友,共同好友越多,优先推荐。 如下图:为好友关系图 整理如下表:第一个为用户,后面的为好友列表
小明 老王 如花 林志玲
老王 小明 凤姐
如花 小明 李刚 凤姐
林志玲 小明 李刚 凤姐 郭美美
李刚 如花 凤姐 林志玲
郭美美 凤姐 林志玲
凤姐 如花 老王 林志玲 郭美美
问题解析:
我们推荐是根据两个用户的共同好友的多少进行排序,共同好友越多,有限推荐。我将这个由共同好友的关系定义为Fof。 我们来看原始数据:
用户 好友1 好友2 好友3
小明 老王 如花 林志玲
我们可以知道小明为任意两个好友的共同好友, 即在好友列表中,任意两个好友都为Fof关系。但是我们发现如果好友顺序不一致, 导致情况重复,如老王-如花,如花-老王是同一种情况。
小明 老王 如花 林志玲
凤姐 如花 老王 林志玲 郭美美
还有一种情况,在一组数据中两个用户为Fof关系,但是在另一组数据中,两人为直接好友 ,如李刚和如花:所以要去除这种情况的Fof关系。
如花 小明 李刚 凤姐
李刚 如花 凤姐 林志玲
在Mapper中输出Fof关系同时,将直接好友关系也创建Fof对象输出, - 只是Fof输出值为1, - 直接好友输出0 在Reducer中奖Fof对应值为0的fof剔除了。
没有考虑直接好友,出现Fof中有直接好友,结果如下:运行结果:发现没有林志玲-如花的Fof,很多Fof组都没有了,
- 在mapper中Fof顺序不同,但是相同用户,作为一组,即Fof类中还是修改,ab和ba是同一组。
代码如下:
package com.chb.friend;
import org.apache.hadoop.io.Text;
/**
*用户的间接好友
*用户 好友1 好友2
* a b c
* a为bc的共同好友,所以bc为间接好友,即Fof关系
* Fof是Text的包装, 将String包装成Text,作为Mapper的输出
*/
public class Fof extends Text{
public Fof() {
super();
}
public Fof(String a, String b) {
super(getFof(a, b));
}
private static String getFof(String a, String b){
int r = a.compareTo(b);
if (r < 0 ) {
return a+"\t"+b;
}else {
return b+"\t"+a;
}
}
}
- 但是,在Reducer输出是要输出ab, ba两种情况
//输出ab
context.write(key, new IntWritable(sum));
//输出ba
String keyTmp = key.toString().split("\t")[1]+"\t"+key.toString().split("\t")[0];
context.write(new Text(keyTmp), new IntWritable(sum));
- 在Mapper中创建直接好友的Fof,方便在Reducer中提出此类Fof
Fof noFof = new Fof(user, f1);
context.write(noFof, new IntWritable(0));
- 在Reducer中剔除为直接好友的Fof
for (IntWritable iw : values) {
if (iw.get() == 0) {//出现值为0的fof为直接好友,不用统计
return;
}
sum += iw.get();
}
具体代码
Fof
package com.chb.friend;
import org.apache.hadoop.io.Text;
/**
*用户的间接好友
*用户 好友1 好友2
* a b c
* a为bc的共同好友,所以bc为间接好友,即Fof关系
* Fof是Text的包装, 将String包装成Text,作为Mapper的输出
*/
public class Fof extends Text{
public Fof() {
super();
}
public Fof(String a, String b) {
super(getFof(a, b));
}
private static String getFof(String a, String b){
int r = a.compareTo(b);
if (r < 0 ) {
return a+"\t"+b;
}else {
return b+"\t"+a;
}
}
}
Mapper
- 提取Fof,
- 设置直接好友的假Fof,以便提出此类Fof 注意:在这个mapper中我们的输入键值都是是Text,
Mapper
,这个决定输入的键值是根据每行的第一个制表符(\t)进行分割,左边为键,右边位置,所以用户为key, 好友列表为value。
package com.chb.friend;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 输入数据:
* 用户 好友列表
* 小明 老王 如花 林志玲
* 将每行数据作为split
* 根据好友列表获取Fof关系:
* 因为 小明为老王,如花,林志玲的共同好友
* 所以任意两个好友都是Fof关系
*/
public class FofMapper extends Mapper{
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String user = key.toString();
//好友列表
String friends[] = value.toString().split("\t");
for (int i = 0; i < friends.length; i++) {
String f1 = friends[i];
//直接好友的Fof,设置输出值为0,方便在Reuder中提出这类Fof
Fof noFof = new Fof(user, f1);
context.write(noFof, new IntWritable(0));
for (int j = i+1; j < friends.length; j++) {
String f2 = friends[j];
Fof fof = new Fof(f1, f2);
context.write(fof, new IntWritable(1));
}
}
}
}
Reducer
package com.chb.friend;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FofReducer extends Reducer{
@Override
protected void reduce(Fof key, Iterable values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable iw : values) {
if (iw.get() == 0) {//出现值为0的fof为直接好友,不用统计了
return;
}
sum += iw.get();
}
//输出Fof ab和ba
context.write(key, new IntWritable(sum));
String keyTmp = key.toString().split("\t")[1]+"\t"+key.toString().split("\t")[0];
context.write(new Text(keyTmp), new IntWritable(sum));
}
}
执行测试程序
使用的服务器模式, 将程序打jar包,放在本地”C:\Users\12285\Desktop\Fof.jar” 通过job.setJar("C:\\Users\\12285\\Desktop\\Fof.jar");
加载jar; 需要加载hadoop配置文件
package com.chb.friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME", "chb");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance();
job.setJobName("Fof");
job.setJarByClass(RunJob.class);
//Mapper
job.setMapperClass(FofMapper.class);
job.setMapOutputKeyClass(Fof.class);
job.setMapOutputValueClass(IntWritable.class);
job.setJar("C:\\Users\\12285\\Desktop\\Fof.jar");
//Reducer
job.setReducerClass(FofReducer.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//输入
FileInputFormat.addInputPaths(job, "/user/chb/input/friend");
//输出
Path out = new Path("/user/chb/output/fof");
if (fs.exists(out)) {
fs.delete(out, true);
}
FileOutputFormat.setOutputPath(job, out);
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("任务完成。。。");
}
}
}
执行的结果为:
结果数据顺序很混乱,我们需要根据Fof的数据的大小进行优先推送。所以需要对Reducer的数据进行排序