前置课程: HDFS开发环境搭建
数据public class KVInputFormatMapper extends Mapper {
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, new LongWritable(1));
context.write(value, new LongWritable(1));
}
}
测试代码
public class KVInputFormatDriver {
public static void main(String[] args) throws Exception {
// 数据输入路径和输出路径
args = new String[2];
args[0] = "src/main/resources/kv/kvi2/";
args[1] = "src/main/resources/kv/kvo2";
Configuration cfg = new Configuration();// 读取配置文件
cfg.set("mapreduce.framework.name", "local");
cfg.set("fs.defaultFS", "file:///");
cfg.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "\t");
final FileSystem filesystem = FileSystem.get(cfg);
if (filesystem.exists(new Path(args[0]))) {
filesystem.delete(new Path(args[1]), true);
}
// 新建一个任务
Job job = Job.getInstance(cfg);
job.setJarByClass(KVInputFormatDriver.class); // 设置主类
job.setInputFormatClass(KeyValueTextInputFormat.class);//设置输入格式
job.setMapperClass(KVInputFormatMapper.class); // Mapper
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
// 提交任务
int ec = job.waitForCompletion(true) ? 0 : 1;
System.exit(ec);
}
}
结果: