- 1.Hadoop MapReduce概述
- 2.MapReduce的思想核心
- 3.MapReduce的特点和局限性
- 4.MapReduce入门案例——WordCount
- 4.1.业务需求
- 4.2.编程思路
- 4.3.编程实现
- 4.3.1.创建Maven项目
- 4.3.2.配置pom.xml文件
- 4.3.3.编写Mapper类和Reducer类
- 4.3.4.编写客户端驱动类
- 4.4.运行MapReduce程序
- 4.4.1.YARN集群模式
- 4.4.2.Local本地模式
本篇文章是对Hadoop MapReduce入门介绍的笔记整理,大部分来自与链接https://www.bilibili.com/video/BV11N411d7Zh
1.Hadoop MapReduce概述(1)Hadoop MapReduce(以下简称MapReduce)是一个分布式计算框架,用于轻松编写分布式应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)。 (2)MapReduce是一种面向海量数据处理的一种指导思想,也是一种用于对大规模数据进行分布式计算的编程模型。 (3)它的出现解决了人们在最初面临海量数据束手无策的问题,同时它还是易于使用和高度可扩展的,使得开发者无需关系分布式系统底层的复杂性即可很容易的编写分布式数据处理程序,并在成千上万台普通的商用服务器中运行。
2.MapReduce的思想核心(1)MapReduce的思想核心是==“先分再合,分而治之”==。 (2)Map负责“拆分”:即把复杂的任务分解为若干个“简单的子任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。 (3)Reduce负责“合并”:即对map阶段的结果进行全局汇总。
(1)特点
易于编程Mapreduce框架提供了用于二次开发的接口;简单地实现一些接口,就可以完成一个分布式程序。任务计算交给计算框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。良好的扩展性当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力。基于MapReduce的分布式计算得特点可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。高容错性Hadoop集群是分布式搭建和部署得,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行,不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。适合海量数据的离线处理可以处理GB、TB和PB级别的数据量(2)局限性
实时计算性能差MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。不能进行流式计算流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化得。 4.MapReduce入门案例——WordCount 4.1.业务需求WordCount中文叫做单词统计、词频统计,指的是统计指定文件中,每个单词出现的总次数()。这个是大数据计算领域经典的入门案例,相当于学习编程语言时的案例——输出"Hello World"。虽然WordCount业务十分简单,但是通过案例感受背后MapReduce的执行流程和默认的行为机制才是关键所在。
(1)map阶段的:把输入的数据经过切割,全部标记1。因此输出就是。 (2)shuffle阶段:经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。 (3)reduce阶段:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,得到单词总次数。
打开IDEA→点击Create New Project→选择Maven,点击Next→为自己的项目取名称,点击Finish。 注:此处使用的IDEA版本是2019,不同版本之间创建Maven项目的步骤可能会有一些差别!
将以下配置添加到pom.xml文件中(在project标签下)
org.apache.hadoop
hadoop-common
3.1.4
org.apache.hadoop
hadoop-hdfs
3.1.4
org.apache.hadoop
hadoop-client
3.1.4
org.apache.hadoop
hadoop-mapreduce-client-core
3.1.4
mysql
mysql-connector-java
5.1.32
org.apache.maven.plugins
maven-jar-plugin
2.4
true
lib/
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.8
1.8
UTF-8
4.3.3.编写Mapper类和Reducer类
WordCountMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
* @description:WordCount Mapper类,对应Maptask
KEYIN: 表示map阶段输入kv中的k类型 在默认组件下 是起始位置偏移量 因此是LongWritable
* VALUEIN:表示map阶段输入kv中的v类型 在默认组件下 是每一行内容 因此是Text.
* todo MapReduce有默认的读取数据组件 叫做TextInputFormat
* todo 读数据的行为是:一行一行读取数据 返回kv键值对
* k:每一行的起始位置的偏移量 通常无意义
* v:这一行的文本内容
* KEYOUT: 表示map阶段输出kv中的k类型 跟业务相关 本需求中输出的是单词 因此是Text
* VALUEOUT: 表示map阶段输出kv中的v类型 跟业务相关 本需求中输出的是单词次数1 因此是LongWritable
* */
public class WordCountMapper extends Mapper {
private Text outkey = new Text();
private final static LongWritable outvalue = new LongWritable(1);
/*
* map方法是mapper阶段核心方法 也是具体业务逻辑实现的方法
* 注意,该方法被调用的次数和输入的kv键值对有关,每当TextInputFormat读取返回一个kv键值对,就调用一次map方法进行业务处理
* 默认情况下,map方法是基于行来处理数据
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据转换为String
String line = value.toString(); //hello hello hello hello allen
//根据分隔符进行切割(此处使用正则表达式,即split("\\s+")按空格或制表符等进行拆分)
String[] words = line.split("\\s+"); //[hello,hello,hello,hello,allen]
//遍历数组(快捷键:iter+Enter)
for (String word : words) {
outkey.set(word);
//输出数据,把每个单词标记1,即输出的结果为
//使用上下文对象将数据输出
context.write(outkey,outvalue); //
}
}
}
WordCountReducer.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
* @description: 本类就是MapReduce程序中Reduce阶段的处理类 对应着ReduceTask
*
* KEYIN: 表示的是reduce阶段输入kv中k的类型 对应着map的输出的key 因此本需求中 就是单词 Text
* VALUEIN:表示的是reduce阶段输入kv中v的类型 对应着map的输出的value 因此本需求中 就是单词次数1 LongWritable
* KEYOUT: 表示的是reduce阶段输出kv中k的类型 跟业务相关 本需求中 还是单词 Text
* VALUEOUT:表示的是reduce阶段输出kv中v的类型 跟业务相关 本需求中 还是单词总次数 LongWritable
*/
public class WordCountReducer extends Reducer {
private LongWritable outvalue=new LongWritable();
/**
* todo Q:当map的所有输出数据来到reduce之后 该如何调用reduce方法进行处理呢?
*
* 1.排序 规则:根据key的字典序进行排序 a-z
*
* 2.分组 规则:key相同的分为一组
*
*
* 3.分组之后,同一组的数据组成一个新的kv键值对,调用一次reduce方法。 reduce方法基于分组调用的 一个分组调用一次。
* todo 同一组中数据组成一个新的kv键值对。
* 新key:该组共同的key
* 新value:该组所有的value组成的一个迭代器Iterable
* ---->
* ---->
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//定义统计变量
long count=0;
//遍历该组的values(快捷键:iter+Enter)
for (LongWritable value : values) {
//累加计算总次数
count+=value.get();
}
outvalue.set(count);
//最终使用上下文对象输出结果
context.write(key,outvalue);
}
}
4.3.4.编写客户端驱动类
方式1:创建Job作业实例提交程序
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* @description: 该类就是MapReduce程序客户端驱动类 主要是构造Job对象实例
* 指定各种组件属性 包括:mapper reducer类、输入输出的数据类型、输入输出的数据路径
* 提交job作业 job.submit()
*/
public class WordCountDriver_v1 {
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration conf = new Configuration();
//conf.set("mapreduce.framework.name","yarn");
//构建Job作业的实例 参数(配置对象、Job名字)
Job job = Job.getInstance(conf, WordCountDriver_v1.class.getSimpleName());
//设置mr程序运行的主类
job.setJarByClass(WordCountDriver_v1.class);
//设置本次mr程序的mapper类型 reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//指定mapper阶段输出的key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reducer阶段输出的key value类型 也是mr程序最终的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//配置本次作业的输入数据路径 和输出数据路径
Path input = new Path(args[0]);
Path output = new Path(args[1]);
//todo 默认组件 TextInputFormat TextOutputFormat
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
//todo 判断输出路径是否已经存在 如果存在先删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output,true);//rm -rf
}
//最终提交本次job作业
//job.submit();
//采用waitForCompletion提交job 参数表示是否开启实时监视追踪作业的执行情况
boolean resultflag = job.waitForCompletion(true);
//退出程序 和job结果进行绑定
System.exit(resultflag ? 0: 1);
}
}
方式2:使用ToolRunner提交程序(推荐使用)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*
* @description: 使用工具类ToolRunner提交MapReduce作业
*/
public class WordCountDriver_v2 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration conf = new Configuration();
//todo 使用工具类ToolRunner提交程序
int status = ToolRunner.run(conf, new WordCountDriver_v2(), args);
//退出客户端
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
//构建Job作业的实例 参数(配置对象、Job名字)
Job job = Job.getInstance(getConf(), WordCountDriver_v2.class.getSimpleName());
//设置mr程序运行的主类
job.setJarByClass(WordCountDriver_v2.class);
//设置本次mr程序的mapper类型 reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//指定mapper阶段输出的key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reducer阶段输出的key value类型 也是mr程序最终的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//配置本次作业的输入数据路径 和输出数据路径
Path input = new Path(args[0]);
Path output = new Path(args[1]);
//todo 默认组件 TextInputFormat TextOutputFormat
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
//todo 判断输出路径是否已经存在 如果存在先删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output,true);//rm -rf
}
return job.waitForCompletion(true)? 0:1;
}
}
4.4.运行MapReduce程序
MapReduce程序的运行模式有两种:YARN集群模式和本地模式。 在何种模式下运行取决于参数mapreduce.framework.name,当其值为yarn时,为YARN集群模式;当为local时,则是Local本地模式。如果不指定,默认是Local本地模式(在导入的包中的mapred-default.xml中有定义),但在搭建好的Hadoop集群的Linux上运行时,Hadoop中的配置(mapred-site.xml和yarn-site.xml)会覆盖原本的default配置,从而使用YARN集群模式。 不过如果在代码中直接设置mapreduce.framework.name=local,即conf.set(“mapreduce.framework.name”,“local”),那么当在Linux中运行jar包时还是会使用Local本地模式,因为在代码中设置的优先级最高!
4.4.1.YARN集群模式(1)启动在Linux中搭建好的Hadoop集群(包括HDFS集群和YARN集群) (2)复制main方法所在类路径(以上的两种驱动类都可以,此处复制的是第一种) 然后在pom.xml文件中配置main方法所在类路径,即将刚才复制的内容粘贴到之前留下的空白处
(3)点击右侧的Maven,再依次点击clean、package,将程序打成jar包
将打包完成的jar包上传到Hadoop集群的任意一个节点的目录下(此处选择放到node1的root目录下)
(4)将测试数据所在的文件testData.txt上传到HDFS的任意一个目录下
此处选择放到/data/wordcount/input目录下
(5)在node1的终端中(当前所在路径为/root,因为jar在该目录下),执行如下的启动命令:
hadoop jar example-mr-1.0-SNAPSHOT.jar /data/wordcount/input /data/wordcount/output
/data/wordcount/input测试文件testData.txt所在的目录/data/wordcount/output计算得到的结果所在的目录(不需要提前创建,否则会报错)
(6)到HDFS的/data/wordcount/output目录下查看或下载结果(已按字典序进行排序): 此外还可以在YARN中查看该任务的执行情况:
(1)点击Edit Configurations… 添加调用main函数时需要的参数,即测试文件testData.txt所在的目录以及计算得到的结果所在的目录(后者不需要自己创建)
(2)出现"Process finished with exit code 0"则说明运行成功
(3)查看结果