您当前的位置: 首页 >  ar

36 MAPREDUCE自定义GroupingComparator

杨林伟 发布时间:2019-08-08 11:19:08 ,浏览量:3

需求

有如下订单数据: 在这里插入图片描述 现在需要求出每一个订单中成交金额最大的一笔交易。

分析

1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce。

2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值。

实现

自定义groupingcomparator

/**
 * 用于控制shuffle过程中reduce端对kv对的聚合逻辑
 * @author duanhaitao@itcast.cn
 *
 */
public class ItemidGroupingComparator extends WritableComparator {

	protected ItemidGroupingComparator() {

		super(OrderBean.class, true);
	}
	

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		OrderBean abean = (OrderBean) a;
		OrderBean bbean = (OrderBean) b;
		
		//将item_id相同的bean都视为相同,从而聚合为一组
		return abean.getItemid().compareTo(bbean.getItemid());
	}
}

定义订单信息bean

/**
 * 订单信息bean,实现hadoop的序列化机制
 * @author duanhaitao@itcast.cn
 *
 */
public class OrderBean implements WritableComparable{
	private Text itemid;
	private DoubleWritable amount;

	public OrderBean() {
	}
	public OrderBean(Text itemid, DoubleWritable amount) {
		set(itemid, amount);
	}

	public void set(Text itemid, DoubleWritable amount) {

		this.itemid = itemid;
		this.amount = amount;

	}

	public Text getItemid() {
		return itemid;
	}

	public DoubleWritable getAmount() {
		return amount;
	}

	@Override
	public int compareTo(OrderBean o) {
		int cmp = this.itemid.compareTo(o.getItemid());
		if (cmp == 0) {

			cmp = -this.amount.compareTo(o.getAmount());
		}
		return cmp;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(itemid.toString());
		out.writeDouble(amount.get());
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		String readUTF = in.readUTF();
		double readDouble = in.readDouble();
		
		this.itemid = new Text(readUTF);
		this.amount= new DoubleWritable(readDouble);
	}


	@Override
	public String toString() {
		return itemid.toString() + "\t" + amount.get();
	}
}

编写mapreduce处理流程

/**
 * 利用secondarysort机制输出每种item订单金额最大的记录
 * @author duanhaitao@itcast.cn
 *
 */
public class SecondarySort {
	
	static class SecondarySortMapper extends Mapper{
		
		OrderBean bean = new OrderBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString();
			String[] fields = StringUtils.split(line, "\t");
			
			bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
			
			context.write(bean, NullWritable.get());
			
		}
		
	}
	
	static class SecondarySortReducer extends Reducer{
		
		
		//在设置了groupingcomparator以后,这里收到的kv数据 就是:  ,null  ,null  .... 
		//此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:
		//要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key
		@Override
		protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}
	
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SecondarySort.class);
		
		job.setMapperClass(SecondarySortMapper.class);
		job.setReducerClass(SecondarySortReducer.class);
		
		
		job.setOutputKeyClass(OrderBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//指定shuffle所使用的GroupingComparator类
		job.setGroupingComparatorClass(ItemidGroupingComparator.class);
		//指定shuffle所使用的partitioner类
		job.setPartitionerClass(ItemIdPartitioner.class);
		
		job.setNumReduceTasks(3);
		
		job.waitForCompletion(true);
		
	}

}
关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 3浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0791s