要求
多个小文件合并,要求将文件合并到SequenceFile中
SequenceFile对外是一个整体,对内还是一个个的文件
期望结果是:
- key:每一个小文件的带路径的文件名
- value:每一个小文件的文件内容
public class FileCombineRecordReader extends RecordReader {//每一个切片(小文件)调用一次这个类
private FileSplit split;
private Configuration cfg;
private boolean isProcess = false;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext Context) {
this.split = (FileSplit) inputSplit;
cfg = Context.getConfiguration();
}
@Override//核心业务逻辑
public boolean nextKeyValue() throws IOException {//一次读取一个完整的文件并封装到KV中
if (!isProcess) {
byte[] buf = new byte[(int) split.getLength()]; //1.根据切片长度定义缓冲区
Path path = split.getPath();//2.获得路径
FileSystem fs = path.getFileSystem(cfg); //3.通过路径获得文件系统
FSDataInputStream fis = fs.open(path); //4.通过文件系统获得输入流
IOUtils.readFully(fis, buf, 0, buf.length); //5.拷贝流
key.set(split.getPath().toString());//设置key值为文件的路径+名称
value.set(buf, 0, buf.length);//将buf中的内容输出到value中
IOUtils.closeStream(fis);
IOUtils.closeStream(fs);//6.关闭流
isProcess = true;//读完之后结束
return true;
}
return false;
}
@Override
public Text getCurrentKey() {//获取当前的key
return key;
}
@Override
public BytesWritable getCurrentValue() {//获取当前的value
return value;
}
@Override
public float getProgress() {//获取正在处理的进度
return 0;
}
@Override
public void close() {
}
}
第二步:自定义InputFromat
public class FileCombineInputFormat extends FileInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false; //原文件不可切割
}
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
FileCombineRecordReader recordReader = new FileCombineRecordReader();//自定义RecordReader对象并初始化
recordReader.initialize(split,context);
return recordReader;
}
}
第三步:编写Mapper类
public class FileCombineMapper extends Mapper {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
第四步:编写Reducer类
public class FileCombineReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//循环写出
for(BytesWritable value : values){
context.write(key, value);
}
}
}
第五步:编写SequenceFileDriver类
public class FileCombineDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 数据输入路径和输出路径
args = new String[2];
args[0] = "src/main/resources/aai/";
args[1] = "src/main/resources/aao";
Configuration cfg = new Configuration();
//设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)
cfg.set("mapreduce.framework.name", "local");
cfg.set("fs.defaultFS", "file:///");
Job job = Job.getInstance(cfg);
job.setJarByClass(FileCombineDriver.class);
job.setMapperClass(FileCombineMapper.class);
job.setReducerClass(FileCombineReducer.class);
//设置inputFormat为自定义的FileCombileInputFormat
job.setInputFormatClass(FileCombineInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);//设置输出的outputFormat
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}