将手机号中以136、137、138、139开头的数据分别放到一个独立的文件中,其他开头的放到一个文件中。
数据:
12384188413 192.168.100.3 4116 1432 200
13590439668 192.168.100.4 1116 954 200
15910133277 192.168.100.5 3156 2936 200
13729199489 192.168.100.6 240 0 200
13630577991 192.168.100.7 6960 690 200
15043685818 192.168.100.8 3659 3538 200
15959002129 192.168.100.9 1938 180 500
13560439638 192.168.100.10 918 4938 200
13470253144 192.168.100.11 180 180 200
具体实现
第一步:自定义Mapper:
public class PhoneMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); //拿到一行数据
String[] fields = line.split("\\s+"); //切分成各个字段
String phoneNumber = fields[0]; //拿到手机号的字段
//封装数据为key-value进行输出
context.write(new Text(phoneNumber), value);
}
}
第二步:自定义Partitioner
public class PhonePartitioner extends Partitioner {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String preNum = key.toString().substring(0, 3);// 1 获取电话号码的前三位
int partition = 4;
switch (preNum) {
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
break;
}
return partition;
}
}
第三步:自定义Reducer
public class PhoneReducer extends Reducer {
int index = 0;
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
index++;
context.write(new LongWritable(index), values.iterator().next());
}
}
第四步:自定义Driver
public class PhoneDriver {
public static void main(String[] args) throws Exception {
args = new String[2];
args[0] = "src/main/resources/phonei";
args[1] = "src/main/resources/phoneo";
// 1 获取配置信息,或者job对象实例
Configuration cfg = new Configuration();
//设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)
cfg.set("mapreduce.framework.name", "local");
cfg.set("fs.defaultFS", "file:///");
Job job = Job.getInstance(cfg);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(PhoneDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(PhoneMapper.class);
job.setReducerClass(PhoneReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// 8 指定自定义数据分区
job.setPartitionerClass(PhonePartitioner.class);
// 9 同时指定相应数量的reduce task(必须指定)
job.setNumReduceTasks(5);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}