1、InputFromat
inputFormat 负责创建inputSplit并且将他们拆分成键值对(records) 有两个方法:
- getSplits
- createRecordReader
通过调用getSplits来进行分片,然后将它们发送到Application Master. Map通过调用InputFormat的createRecordReader
方法来获取RecordReader对象, RecordReader就相当于record的迭代器,map任务使用此生成的键值对,然后传递给map()。 查看Mapper的run()可以看到map的整个完成流程:
- 首先执行set(),
- 然后调用context的nextKeyValue(), 为mapper提供键值对.
- 通过context,键值对从RecordReader取出后传递给map();当reader读到stream的结尾,netKeyValue()返回fals, map结束。
- 调用cleanup()
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
Mapper的run()方法是公共的,可以由用户定制。MultithreadedMapper是一个多线程并发运行多个mapper的实现(mapreduce.mapper.multithreadedmapper.threads可以设置线程数量)。对于大多数的数据处理任务来说,默认的执行机制没有优势。但是对于因为需要链接外部服务器而造成单个记录处理时间较长的mapper来说,它允许多个mapper在同一个JVM下尽量避免竞争方式执行。