您当前的位置: 首页 > 

梁云亮

暂无认证

  • 1浏览

    0关注

    1211博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

FileInputFormat 之 自定义InputFormat实现 小文件的合并

梁云亮 发布时间:2019-10-26 23:08:05 ,浏览量:1

要求

多个小文件合并,要求将文件合并到SequenceFile中

SequenceFile对外是一个整体,对内还是一个个的文件

期望结果是:

  • key:每一个小文件的带路径的文件名
  • value:每一个小文件的文件内容 在这里插入图片描述
第一步:自定义RecordReader类
public class FileCombineRecordReader extends RecordReader {//每一个切片(小文件)调用一次这个类
    private FileSplit split;
    private Configuration cfg;

    private boolean isProcess = false;

    private Text key = new Text();
    private BytesWritable value = new BytesWritable();

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext Context) {
        this.split = (FileSplit) inputSplit;
        cfg = Context.getConfiguration();
    }

    @Override//核心业务逻辑
    public boolean nextKeyValue() throws IOException {//一次读取一个完整的文件并封装到KV中
        if (!isProcess) {
            byte[] buf = new byte[(int) split.getLength()]; //1.根据切片长度定义缓冲区
            Path path = split.getPath();//2.获得路径
            FileSystem fs = path.getFileSystem(cfg); //3.通过路径获得文件系统
            FSDataInputStream fis = fs.open(path); //4.通过文件系统获得输入流
            IOUtils.readFully(fis, buf, 0, buf.length);   //5.拷贝流

            key.set(split.getPath().toString());//设置key值为文件的路径+名称
            value.set(buf, 0, buf.length);//将buf中的内容输出到value中

            IOUtils.closeStream(fis);
            IOUtils.closeStream(fs);//6.关闭流
            isProcess = true;//读完之后结束
            return true;
        }
        return false;
    }

    @Override
    public Text getCurrentKey() {//获取当前的key
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() {//获取当前的value
        return value;
    }

    @Override
    public float getProgress() {//获取正在处理的进度
        return 0;
    }

    @Override
    public void close() {
    }
}
第二步:自定义InputFromat
public class FileCombineInputFormat extends FileInputFormat {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false; //原文件不可切割
    }

    @Override
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
        FileCombineRecordReader recordReader = new FileCombineRecordReader();//自定义RecordReader对象并初始化
        recordReader.initialize(split,context);
        return recordReader;
    }
}
第三步:编写Mapper类
public class FileCombineMapper extends Mapper {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}
第四步:编写Reducer类
public class FileCombineReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //循环写出
        for(BytesWritable value : values){
            context.write(key, value);
        }
    }
}
第五步:编写SequenceFileDriver类
public class FileCombineDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 数据输入路径和输出路径
        args = new String[2];
        args[0] = "src/main/resources/aai/";
        args[1] = "src/main/resources/aao";

        Configuration cfg = new Configuration();
        //设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)
        cfg.set("mapreduce.framework.name", "local");
        cfg.set("fs.defaultFS", "file:///");

        Job job = Job.getInstance(cfg);
        job.setJarByClass(FileCombineDriver.class);

        job.setMapperClass(FileCombineMapper.class);
        job.setReducerClass(FileCombineReducer.class);

        //设置inputFormat为自定义的FileCombileInputFormat
        job.setInputFormatClass(FileCombineInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);//设置输出的outputFormat

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);

        System.out.println(b);
    }
}
关注
打赏
1665409997
查看更多评论
立即登录/注册

微信扫码登录

0.1107s