Map函数开始产生输出时,并不是简单地把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先写到内存中的一个缓冲区,并做一些预排序,以提升效率; 每个MapTask都有一个用来写入输出数据的循环内存缓冲区(默认大小为100MB),当缓冲区中的数据量达到一个特定阈值时(默认是80%)系统将会启动一个后台线程把缓冲区中的内容写到磁盘(即spill阶段)。在写磁盘过程中,Map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,那么Map就会阻塞直到写磁盘过程完成; 在写磁盘前,线程首先根据数据最终要传递到的Reducer把数据划分成相应的分区(partition)。在每个分区中,后台线程按Key进行排序(快速排序),如果有一个Combiner(即Mini Reducer)便会在排序后的输出上运行; 一旦内存缓冲区达到溢出写的阈值,就会创建一个溢出写文件,因此在MapTask完成其最后一个输出记录后,便会有多个溢出写文件。在在MapTask完成前,溢出写文件被合并成一个索引文件和数据文件(多路归并排序)(Sort阶段); 溢出写文件归并完毕后,Map将删除所有的临时溢出写文件,并告知TaskTracker任务已完成,只要其中一个MapTask完成,ReduceTask就开始复制它的输出(Copy阶段); Map的输出文件放置在运行MapTask的TaskTracker的本地磁盘上,它是运行ReduceTask的TaskTracker所需要的输入数据,但是Reduce输出不是这样的,它一般写到HDFS中(Reduce阶段)。
2. Reduce端的ShuffleCopy阶段:Reduce进程启动一些数据copy线程,通过HTTP方式请求MapTask所在的TaskTracker以获取输出文件。 Merge阶段:将Map端复制过来的数据先放入内存缓冲区中,Merge有3种形式,分别是内存到内存,内存到磁盘,磁盘到磁盘。默认情况下第一种形式不启用,第二种Merge方式一直在运行(spill阶段)直到结束,然后启用第三种磁盘到磁盘的Merge方式生成最终的文件。 Reduce阶段:最终文件可能存在于磁盘,也可能存在于内存中,但是默认情况下是位于磁盘中的。当Reduce的输入文件已定,整个Shuffle就结束了,然后就是Reduce执行,把结果放到HDFS中。
问题二:统计单词总量 经过wordcount之后为每个单词的个数:context.write(new Text("count"), new IntWritable(num));
那么reduce中的数据都是为count:num,只有进行累加就可以了
方法二:自定义一个分组, 由于只有一个reduce,默认分组时按照key相同,自定义分组,将所有key分到一组
/**
* 自定义分组,
* 默认的分组是按照key是否相同,
*/
public class MyGroup extends WritableComparator{
public MyGroup() {
super(Text.class, true);
}
/**
* 由于分组是按照月份分组,所以在比较是只要比较月份是否相同,
* 其他略去,做数据过滤
*/
public int compare(WritableComparable a, WritableComparable b) {
return 0;
}
}
这个应该由更简洁的方法,如果你知道,请告知分享一下,谢谢。
问题三:将一个目录做为mapreduce的输入, 这个目录有f1,f2,f3三个文件,在程序运行时,导入一个文件f4, 会出现什么情况。
问题三:hbase库中任拿一条数据,如何知道在哪个region中?
问题四:hbase批量入库时,程序出现了问题,出现部分数据还没有入库,这部分数据时如何处理的?
问题五:Storm的窗口机制