前置博客:
搭建Hadoop3.1.2伪分布方式环境 本博客示例中可能出现的错误及解决方案: Name node is in safe mode. Container killed on request. Exit code is 143
简介在开发之初,Avro就是围绕着完善Hadoop生态系统的数据处理而开展的(使用Avro作为Hadoop MapReduce需要处理数据序列化和反序列化的场景),因此Hadoop MapReduce集成Avro也就是自然而然的事情。
在MapReduce中使用Avro可以提升数据的处理性能,主要是以下几点:
- 向Job提供数据文件时可以使用Avro序列化过的二进制数据文件
- 在数据解析方面速度比较快
- 排序功能
- CentOS7.0
- Hadoop的版本是3.1.2
- Avro的版本是1.9.1
Hadoop MapReduce读取源文件进行计数统计,然后将计算结果作为Avro格式的数据写到目标文件中。
首先将avro-mapred-1.9.1.jar上传到share/hadoop/mapreduce/目录- 添加Maven依赖
junit
junit
4.12
org.apache.avro
avro
1.9.1
org.apache.avro
avro-mapred
1.9.1
org.apache.hadoop
hadoop-client
3.1.2
log4j
log4j
1.2.17
- 配置插件
org.apache.avro
avro-maven-plugin
1.9.1
generate-sources
schema
${project.basedir}/src/main/resources/
${project.basedir}/src/main/java/
org.apache.maven.plugins
maven-jar-plugin
com.hc.WordCountDriver
org.apache.maven.plugins
maven-compiler-plugin
1.8
1.8
第二步:在resources目录中提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
第三步:创建WordCountSchema
public class WordCountSchema {
public static Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordCountRecord\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}");
}
上面schema的值是我们写的Avro的字符串:
{
"type":"record",
"name":"WordCountRecord",
"fields":[
{"name":"count","type":"int"}
]
}
第四步:创建WordCountMapper
public class WordCountMapper extends Mapper {
private GenericRecord record = new GenericData.Record(WordCountSchema.schema);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (int i = 0; i 0 && Character.isLetter(words[i].charAt(0))) {
AvroKey word = new AvroKey(words[i]);
record.put("count", 1);
context.write(word, new AvroValue(record));
}
}
}
}
第五步:创建WordCountReducer
public class WordCountReducer extends Reducer {
@Override
protected void reduce(AvroKey key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (AvroValue value : values) {
GenericRecord record = value.datum();
sum += Integer.parseInt(record.get("count").toString());
}
context.write(key, new AvroValue(sum));
}
}
第六步:创建WordCountDriver
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();
//获取配置信息以及封装任务
Job job = Job.getInstance(cfg, "WordCountAvro");
job.setJarByClass(WordCountDriver.class);//设置jar加载路径
job.setMapperClass(WordCountMapper.class);//设置Mapper类,执行map方法
job.setReducerClass(WordCountReducer.class);//设置Reducer类,执行reduce方法
AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job,WordCountSchema.schema);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
//设置输入和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径
boolean res = job.waitForCompletion(true);//提交job并等待结束
System.exit(res ? 0 : 1); //退出程序
}
}
上面代码使用AvroJob来配置作业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。
第七步:运行程序- 项目打包
将打好的包上传到Linux服务器中,重命名为mrad.jar
- 在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面放置几篇英文文章作为待测试的数据
- 运行1步中上传的jar包
4.查看程序运行结果:
否则的话会报错:
- 添加Maven依赖
junit
junit
4.12
org.apache.avro
avro
1.9.1
org.apache.avro
avro-mapred
1.9.1
org.apache.hadoop
hadoop-client
3.1.2
log4j
log4j
1.2.17
- 配置插件
org.apache.avro
avro-maven-plugin
1.9.1
generate-sources
schema
${project.basedir}/src/main/resources/
${project.basedir}/src/main/java/
org.apache.maven.plugins
maven-jar-plugin
com.hc.StuDriver
org.apache.maven.plugins
maven-compiler-plugin
1.8
1.8
- 第二步:在resources目录中
- 提供log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 提供stu.avsc
{
"type": "record",
"name": "StuRecord",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "gender", "type": "string"},
{"name": "class", "type": "string"}
]
}
第三步:创建StuSchema
public class StuSchema {
public static Schema schema ;
static{
InputStream is = StuSchema.class.getClassLoader().getResourceAsStream("stu.avsc");
try {
schema = new Parser().parse(is);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println(schema);
}
}
第四步:创建StuMapper
public class StuMapper extends Mapper {
private GenericRecord record = new GenericData.Record(StuSchema.schema);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split("\\s+"); //匹配一个或多个空格
if (strings.length Integer.parseInt(record.get("age").toString()))) {
min = new GenericData.Record(StuSchema.schema);
min.put("name", record.get("name"));
min.put("age", record.get("age"));
min.put("gender", record.get("gender"));
min.put("class", record.get("class"));
}
}
context.write(new AvroKey(min), NullWritable.get());
}
}
Reducer的逻辑其实是通过循环比较的方式找到每个班级年龄最小的学生。
第六步:创建StuDriverpublic class StuDriver {
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();
// 可以解决在Hadoop集群中运行时使用的Avro版本和集群中Avro版本不一致的问题。
//cfg.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
//获取配置信息以及封装任务
Job job = Job.getInstance(cfg, "Stu");
job.setJarByClass(StuDriver.class);//设置jar加载路径
job.setMapperClass(StuMapper.class);//设置Mapper类,执行map方法
job.setReducerClass(StuReducer.class);//设置Reducer类,执行reduce方法
AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));;
AvroJob.setMapOutputValueSchema(job,StuSchema.schema);
AvroJob.setOutputKeySchema(job, StuSchema.schema);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
//设置输入和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));//文件输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//文件输出路径
boolean res = job.waitForCompletion(true);//提交job并等待结束
System.exit(res ? 0 : 1); //退出程序
}
}
第七步:运行程序
-
项目打包 将打好的包上传到Linux服务器中,重命名为mras.jar
-
在Linux服务器端启动Hadoop,在HDFS根目录创建input目录,同时在里面创建文件文件stu.txt,在其中添加测试的数据:
zhangsan 13 male shiziBan
lisi 14 female musicBan
wanger 19 male musicBan
mazi 15 male shiziBan
qianwu 12 female wudaoBan
zhaoliu 16 female shiziBan
lisi 18 male wudaoBan
xiangming 13 female shiziBan
wangwei 18 female wudaoBan
ligang 10 male musicBan
- 运行1步中上传的jar包
4.查看程序运行结果:
上传avro-tools-1.9.1.jar到Linux服务器的test目录,
运行命令: