您当前的位置: 首页 > 

28 MAPREDUCE中的排序初步

杨林伟 发布时间:2019-08-08 10:40:38 ,浏览量:2

需求

对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果,数据如下:

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82             24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
分析

基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。

所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

实现

1、自定义的bean

public class FlowBean implements WritableComparable{
	
	long upflow;
	long downflow;
	long sumflow;
	
	//如果空参构造函数被覆盖,一定要显示定义一下,否则在反序列时会抛异常
	public FlowBean(){}
	
	public FlowBean(long upflow, long downflow) {
		super();
		this.upflow = upflow;
		this.downflow = downflow;
		this.sumflow = upflow + downflow;
	}
	
	public long getSumflow() {
		return sumflow;
	}

	public void setSumflow(long sumflow) {
		this.sumflow = sumflow;
	}

	public long getUpflow() {
		return upflow;
	}
	public void setUpflow(long upflow) {
		this.upflow = upflow;
	}
	public long getDownflow() {
		return downflow;
	}
	public void setDownflow(long downflow) {
		this.downflow = downflow;
	}

	//序列化,将对象的字段信息写入输出流
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeLong(upflow);
		out.writeLong(downflow);
		out.writeLong(sumflow);
		
	}

	//反序列化,从输入流中读取各个字段信息
	@Override
	public void readFields(DataInput in) throws IOException {
		upflow = in.readLong();
		downflow = in.readLong();
		sumflow = in.readLong();
		
	}
	
	
	@Override
	public String toString() {
		return upflow + "\t" + downflow + "\t" + sumflow;
	}
	@Override
	public int compareTo(FlowBean o) {
		//自定义倒序比较规则
		return sumflow > o.getSumflow() ? -1:1;
	}
}

2、mapper 和 reducer

public class FlowCount {

	static class FlowCountMapper extends Mapper {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString();
			String[] fields = line.split("\t");
			try {
				String phonenbr = fields[0];

				long upflow = Long.parseLong(fields[1]);
				long dflow = Long.parseLong(fields[2]);

				FlowBean flowBean = new FlowBean(upflow, dflow);

				context.write(flowBean,new Text(phonenbr));
			} catch (Exception e) {

				e.printStackTrace();
			}

		}

	}

	static class FlowCountReducer extends Reducer {

		@Override
		protected void reduce(FlowBean bean, Iterable phonenbr, Context context) throws IOException, InterruptedException {

			Text phoneNbr = phonenbr.iterator().next();

			context.write(phoneNbr, bean);

		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowCount.class);

		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		 job.setMapOutputKeyClass(FlowBean.class);
		 job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// job.setInputFormatClass(TextInputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);

	}

}
关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 2浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.1060s