- Map
- Reduce 当向MapReduce框架提交一个任务, 会先把任务拆分成若干个Map任务, 然后分配到不同的节点执行, 每个Map任务处理输入数据的一部分, 当Map任务完成后,会生成一些中间文件, 这些中间文件会作为Reduce任务的输入数据,Reduce任务的主要目标就是将若干个Map的输出汇总到一起并输出,从高层抽象来看,MapReduce的数据流图如下:
- Map – shuffle–sort — 分组–reduce —> 输出到本地磁盘
- 默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的
- 默认情况下, 以HDFS的一个块的大小(默认128M)为一个分片,块的大小是可以设置。
- 如果数据块(Block)的大小是默认值128MB,输入文件有两个,一个是32MB,一个是 172MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由 一个Mapper进程处理。这里的三个输入片,会有三个Mapper进程处理。
- 这个地方也说明hadoop处理大量小文件的时候会出现性能问题,产生过多的map任务
- 另外如果小文件过多,一个文件占用一个block, namenode存储这些元数据,这个会占用namenode的资源。
- 访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。
- 上个阶段解析出来的每一个键值对,调用一次map方法。如果有1000个键值对,就会调用1000次map方法。每一次调用map方法会输出零个或者多个键值对。
- -
- 1、暂存缓冲区
- 会暂时的放在一个环形的内存缓存区中(默认为100M, 有io.sort.mb属性控制)
- 2、partition
- 基于键值进行分区,这个决定该数据会被哪个reducer处理。
- 3、sort
- 4、spill to disk
- Spill可以认为是一个包括Sort和Combiner(Combiner是可选的,用户如果定义就有)的过程。
- 缓存区快要溢时(默认为缓冲区大小的80%, 由io.sort.spill.percent属性控制), 会在本地系统中创建一个溢出文件,将缓冲区中的数据写到这个文件中。
- 可能会有许多的的溢出文件,这是需要将这些文件合并, 合并的过程中,会不断地进行排序和combiner操作, 目的有两个:
- 尽量减少每次写入磁盘的数据量
- 尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了
- 可能会有许多的的溢出文件,这是需要将这些文件合并, 合并的过程中,会不断地进行排序和combiner操作, 目的有两个:
5、 merge on disk
- 4、将分区中的数据拷贝给相对应的reduce任务
- 有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就可以了。
- 1、Reducer任务会主动从Mapper任务复制其输出的键值对,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中;如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中。
- 缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比;
- 比例由mapred.job.shuffle.merge.percent决定
- 2、随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件(sort, merge)
- 这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作;
3、合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
4、reduce
- 对相同key的集合(values)调用reduce方法。
- 5、输出
- 把这些输出的键值对写入到HDFS文件中。
shuffle过程包括在map方法输出(partition->sort>spill->merge)到reduce输入(merge->sort)
3、MapReduce工作机制剖析(hadoop1.x) - 1、在集群中的任意一个节点提交MapReduce程序; - 2、JobClient收到作业后,JobClient向JobTracker请求获取一个Job ID; - 3、将运行作业所需要的资源文件复制到HDFS上(包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息),这些文件都存放在JobTracker专门为该作业创建的文件夹中,文件夹名为该作业的Job ID; - 4、获得作业ID后,提交作业; - 5、JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行; - 6、对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”; - 7、TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户; - 8、运行的TaskTracker从HDFS中获取运行所需要的资源,这些资源包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分等信息; - 9、TaskTracker获取资源后启动新的JVM虚拟机; - 10、运行每一个任务;