您当前的位置: 首页 > 

38 MAPREDUCE中的其他应用

杨林伟 发布时间:2019-08-08 11:25:14 ,浏览量:2

计数器应用

在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现,示例代码如下:

public class MultiOutputs {
	//通过枚举形式定义自定义计数器
	enum MyCounter{MALFORORMED,NORMAL}

	static class CommaMapper extends Mapper {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String[] words = value.toString().split(",");

			for (String word : words) {
				context.write(new Text(word), new LongWritable(1));
			}
			//对枚举定义的自定义计数器加1
			context.getCounter(MyCounter.MALFORORMED).increment(1);
			//通过动态设置自定义计数器加1
			context.getCounter("counterGroupa", "countera").increment(1);
		}

	}
多job串联

一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现。

示例代码:

      ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
        ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
        ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
       
        // 设置作业依赖关系
        cJob2.addDependingJob(cJob1);
        cJob3.addDependingJob(cJob2);
 
        JobControl jobControl = new JobControl("RecommendationJob");
        jobControl.addJob(cJob1);
        jobControl.addJob(cJob2);
        jobControl.addJob(cJob3);
 
        cJob1.setJob(job1);
        cJob2.setJob(job2);
        cJob3.setJob(job3);
 
        // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束
        Thread jobControlThread = new Thread(jobControl);
        jobControlThread.start();
        while (!jobControl.allFinished()) {
            Thread.sleep(500);
        }
        jobControl.stop();
 
        return 0;
关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 2浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0791s