需求
流量求和:统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据1363157985066 120.196.100.82 2481 24681 200
1363157995033 120.197.40.4 264 0 200
1363157993055 120.196.100.99 132 1512 200
1363154400022 120.197.40.4 240 0 200
1363157993044 120.196.100.99 1527 2106 200
1363157993055 120.197.40.4 4116 1432 200
1363157993055 120.196.100.99 1116 954 200
1363157995033 120.197.40.4 3156 2936 200
1363157983019 120.196.100.82 240 0 200
1363154400022 120.197.40.4 6960 690 200
1363157973098 120.197.40.4 3659 3538 200
1363157993055 120.196.100.99 1938 180 200
1363154400022 120.196.100.99 918 4938 200
1363157993055 120.197.40.4 180 180 200
1363157984040 120.197.40.4 1938 2910 200
1363157995033 120.196.100.82 3008 3720 200
1363154400022 120.196.100.99 7335 110349 200
1363157993055 120.196.100.99 9531 2412 200
1363157990043 120.196.100.55 11058 48243 200
1363157993055 120.196.100.82 120 120 200
1363157985066 120.196.100.82 2481 24681 200
1363157993055 120.196.100.99 1116 954 200
代码实现
第一步:创建一个FlowBean的实体类,实现序列化操作:
public class FlowBean implements Writable {
private String phoneNumber; //电话号码
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
// …… getter/setter、toString()方法
public FlowBean() {//在反序列化时候,反射机制需要调用空参的构造方法
}
//为了对象数据的初始化方便,提供一个带参的构造方法
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//从数据流中反序列出对象的数据。从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException { //将对象数据序列化到流中
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
}
第二步:定义Mapper类,代码如下:
public class FlowMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString(); //拿到一行数据
String[] fields = line.split("\\s+"); //切分成各个字段
String phoneNumber = fields[1]; //手机号
long upFlow = Long.parseLong(fields[7]); //上行流量
long downFlow = Long.parseLong(fields[8]); //下行流量
//封装数据为key-value进行输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, upFlow, downFlow));
}
}
第三步:定义Reducer类,代码如下:
public class FlowReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long upFlowSum = 0; //上行流量计数器
long downFlowSum = 0; //下行流量计数器
for (FlowBean bean : values) {//上行流量和下行流量累加求和
upFlowSum += bean.getUpFlow();
downFlowSum += bean.getDownFlow();
}
//将结果输出
context.write(key, new FlowBean(key.toString(), upFlowSum, downFlowSum));
}
}
第四步:编写测试代码:
public class FlowDriver {
public static void main(String[] args) throws Exception {
// 数据输入路径和输出路径
args = new String[2];
args[0] = "src/main/resources/flowi";
args[1] = "src/main/resources/flowo";
Configuration cfg = new Configuration();// 读取配置文件
//设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)
cfg.set("mapreduce.framework.name", "local");
cfg.set("fs.defaultFS", "file:///");
Job job = Job.getInstance(cfg);// 新建一个任务
job.setJarByClass(FlowDriver.class); // 设置主类
job.setInputFormatClass(TextInputFormat.class);//设置输入格式
job.setOutputFormatClass(TextOutputFormat.class);
//本job使用的mapper和reducer
job.setMapperClass(FlowMapper.class); // Mapper
job.setReducerClass(FlowReducer.class); // Reducer
//指定mapper输出数据的key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//指定最终输出数据的key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
// 提交任务
int res = job.waitForCompletion(true) ? 0 : 1;
System.exit(res);
}
}