您当前的位置: 首页 >  vr

梁云亮

暂无认证

  • 2浏览

    0关注

    1211博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

MapReduce整合Avro

梁云亮 发布时间:2020-02-11 11:36:58 ,浏览量:2

前置博客:

搭建Hadoop3.1.2伪分布方式环境 本博客示例中可能出现的错误及解决方案: Name node is in safe mode. Container killed on request. Exit code is 143

简介

在开发之初,Avro就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。

在MapReduce中使用Avro可以提升数据的处理性能,主要是以下几点:

  • 向Job提供数据文件时可以使用Avro序列化过的二进制数据文件
  • 在数据解析方面速度比较快
  • 排序功能
本博客用到的软件的版本:
  • CentOS7.0
  • Hadoop的版本是3.1.2
  • Avro的版本是1.9.1
示例1:单词统计

Hadoop MapReduce读取源文件进行计数统计,然后将计算结果作为Avro格式的数据写到目标文件中。

首先将avro-mapred-1.9.1.jar上传到share/hadoop/mapreduce/目录

/usr/local/hadoop3.1.2/hadoop-standalone/

第一步:创建Maven项目
  • 添加Maven依赖

    junit
    junit
    4.12


    org.apache.avro
    avro
    1.9.1


    org.apache.avro
    avro-mapred
    1.9.1



    org.apache.hadoop
    hadoop-client
    3.1.2



    log4j
    log4j
    1.2.17

  • 配置插件
 
    
        org.apache.avro
        avro-maven-plugin
        1.9.1
        
            
                generate-sources
                
                    schema
                
                
                    ${project.basedir}/src/main/resources/
                    ${project.basedir}/src/main/java/
                
            
        
    
    
        org.apache.maven.plugins
        maven-jar-plugin
        
            
                
                    com.hc.WordCountDriver
                
            
        
    
    
        org.apache.maven.plugins
        maven-compiler-plugin
        
            1.8
            1.8
        
    

第二步:在resources目录中提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
第三步:创建WordCountSchema
public class WordCountSchema {
    public static Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordCountRecord\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}");
}

上面schema的值是我们写的Avro的字符串:

{
"type":"record",
"name":"WordCountRecord",
"fields":[
    {"name":"count","type":"int"}
]
}
第四步:创建WordCountMapper
public class WordCountMapper extends Mapper {
    private GenericRecord record = new GenericData.Record(WordCountSchema.schema);
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");

        for (int i = 0; i  0 && Character.isLetter(words[i].charAt(0))) {
                AvroKey word = new AvroKey(words[i]);
                record.put("count", 1);
                context.write(word, new AvroValue(record));
            }
        }
    }
}
第五步:创建WordCountReducer
public class WordCountReducer extends Reducer {

    @Override
    protected void reduce(AvroKey key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (AvroValue value : values) {
            GenericRecord record = value.datum();
            sum += Integer.parseInt(record.get("count").toString());
        }
        context.write(key, new AvroValue(sum));
    }

}
第六步:创建WordCountDriver
public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();

        //获取配置信息以及封装任务
        Job job = Job.getInstance(cfg, "WordCountAvro");

        job.setJarByClass(WordCountDriver.class);//设置jar加载路径

        job.setMapperClass(WordCountMapper.class);//设置Mapper类,执行map方法
        job.setReducerClass(WordCountReducer.class);//设置Reducer类,执行reduce方法

        AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job,WordCountSchema.schema);

        AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));

        //设置输入和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径
        
        boolean res = job.waitForCompletion(true);//提交job并等待结束
        System.exit(res ? 0 : 1); //退出程序
    }
}

上面代码使用AvroJob来配置作业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。

第七步:运行程序
  1. 项目打包 在这里插入图片描述 将打好的包上传到Linux服务器中,重命名为mrad.jar 在这里插入图片描述
  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面放置几篇英文文章作为待测试的数据 在这里插入图片描述
  3. 运行1步中上传的jar包 在这里插入图片描述 4.查看程序运行结果: 在这里插入图片描述
示例2:通过MapReduce程序找到各个班级年龄最小的学生 第一步:将avro-1.9.1.jar上传到到share/hadoop/mapreduce/目录

在这里插入图片描述 否则的话会报错: 在这里插入图片描述

第二步:创建Maven项目
  • 添加Maven依赖

    junit
    junit
    4.12


    org.apache.avro
    avro
    1.9.1


    org.apache.avro
    avro-mapred
    1.9.1



    org.apache.hadoop
    hadoop-client
    3.1.2



    log4j
    log4j
    1.2.17

  • 配置插件
 
    
        org.apache.avro
        avro-maven-plugin
        1.9.1
        
            
                generate-sources
                
                    schema
                
                
                    ${project.basedir}/src/main/resources/
                    ${project.basedir}/src/main/java/
                
            
        
    
    
        org.apache.maven.plugins
        maven-jar-plugin
        
            
                
                    com.hc.StuDriver
                
            
        
    
    
        org.apache.maven.plugins
        maven-compiler-plugin
        
            1.8
            1.8
        
    

  • 第二步:在resources目录中
  1. 提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  1. 提供stu.avsc
{
    "type": "record",
    "name": "StuRecord",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "gender", "type": "string"},
        {"name": "class", "type": "string"}
    ]
}
第三步:创建StuSchema
public class StuSchema {
    public static Schema schema ;

    static{
        InputStream is = StuSchema.class.getClassLoader().getResourceAsStream("stu.avsc");
        try {
            schema = new Parser().parse(is);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        System.out.println(schema);
    }
}
第四步:创建StuMapper
public class StuMapper extends Mapper {
    private GenericRecord record = new GenericData.Record(StuSchema.schema);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split("\\s+"); //匹配一个或多个空格
        if (strings.length  Integer.parseInt(record.get("age").toString()))) {
                min = new GenericData.Record(StuSchema.schema);
                min.put("name", record.get("name"));
                min.put("age", record.get("age"));
                min.put("gender", record.get("gender"));
                min.put("class", record.get("class"));
            }
        }
        context.write(new AvroKey(min), NullWritable.get());
    }
}

Reducer的逻辑其实是通过循环比较的方式找到每个班级年龄最小的学生。

第六步:创建StuDriver
public class StuDriver {
    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();
        // 可以解决在Hadoop集群中运行时使用的Avro版本和集群中Avro版本不一致的问题。
        //cfg.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

        //获取配置信息以及封装任务
        Job job = Job.getInstance(cfg, "Stu");

        job.setJarByClass(StuDriver.class);//设置jar加载路径

        job.setMapperClass(StuMapper.class);//设置Mapper类,执行map方法
        job.setReducerClass(StuReducer.class);//设置Reducer类,执行reduce方法

        AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));;
        AvroJob.setMapOutputValueSchema(job,StuSchema.schema);

        AvroJob.setOutputKeySchema(job, StuSchema.schema);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);

        //设置输入和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径

        boolean res = job.waitForCompletion(true);//提交job并等待结束
        System.exit(res ? 0 : 1); //退出程序
    }
}
第七步:运行程序
  1. 项目打包 将打好的包上传到Linux服务器中,重命名为mras.jar

  2. 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面创建文件文件stu.txt,在其中添加测试的数据:

zhangsan    13  male    shiziBan
lisi	14	female  musicBan
wanger	19	male    musicBan
mazi	15	male    shiziBan
qianwu	12	female  wudaoBan
zhaoliu	16	female  shiziBan
lisi	18	male    wudaoBan
xiangming   13  female  shiziBan
wangwei	18	female  wudaoBan
ligang  10  male    musicBan
  1. 运行1步中上传的jar包 在这里插入图片描述 4.查看程序运行结果: 在这里插入图片描述 上传avro-tools-1.9.1.jar到Linux服务器的test目录,

在这里插入图片描述 运行命令:在这里插入图片描述

关注
打赏
1665409997
查看更多评论
立即登录/注册

微信扫码登录

0.0638s