您当前的位置: 首页 >  算法

32 MAPREDUCE的map端join算法实现

杨林伟 发布时间:2019-08-08 11:02:21 ,浏览量:1

原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。

实现示例

1.在mapper类中预先定义好小表,进行join

2.引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {
	static class TestDistributedCacheMapper extends Mapper{
		FileReader in = null;
		BufferedReader reader = null;
		HashMap b_tab = new HashMap();
		String localpath =null;
		String uirpath = null;
		
		//是在map任务初始化的时候调用一次
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			//通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
			Path[] files = context.getLocalCacheFiles();
			localpath = files[0].toString();
			URI[] cacheFiles = context.getCacheFiles();
			
			
			//缓存文件的用法——直接用本地IO来读取
			//这里读的数据是map task所在机器本地工作目录中的一个小文件
			in = new FileReader("b.txt");
			reader =new BufferedReader(in);
			String line =null;
			while(null!=(line=reader.readLine())){
				
				String[] fields = line.split(",");
				b_tab.put(fields[0],fields[1]);
				
			}
			IOUtils.closeStream(reader);
			IOUtils.closeStream(in);
			
		}
		
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			//这里读的是这个map task所负责的那一个切片数据(在hdfs上)
			 String[] fields = value.toString().split("\t");
			 
			 String a_itemid = fields[0];
			 String a_amount = fields[1];
			 
			 String b_name = b_tab.get(a_itemid);
			 
			 // 输出结果  1001	98.9	banan
			 context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
			 
		}
		
		
	}
	
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(TestDistributedCache.class);
		
		job.setMapperClass(TestDistributedCacheMapper.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//这里是我们正常的需要处理的数据所在路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//不需要reducer
		job.setNumReduceTasks(0);
		//分发一个文件到task进程的工作目录
		job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
		
		//分发一个归档文件到task进程的工作目录
//		job.addArchiveToClassPath(archive);

		//分发jar包到task节点的classpath下
//		job.addFileToClassPath(jarfile);
		
		job.waitForCompletion(true);
	}
}
关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 1浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.1031s