准备工作
1.在HBase中创建一个名为fruit的表作为源表,如下: 2. 在HBase中再创建一个名为fruit2的表作为目标表,如下:
3. 往表中添加几组数据
public class FruitMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for (Cell cell : value.rawCells()) {
//添加/克隆列族:info
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
//添加/克隆列:name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
} else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}
创建Reducer
public class FruitReducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit2表中
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
创建Runner
public class FruitRunner {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
//创建Job任务
Job job = Job.getInstance(conf, FruitRunner.class.getSimpleName());
//指定Driver类
job.setJarByClass(FruitRunner.class);
//配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //数据源的表名
scan, //scan扫描控制器
FruitMapper.class,//设置Mapper类
ImmutableBytesWritable.class,//设置Mapper输出key类型
Put.class,//设置Mapper输出value值类型
job//设置给哪个JOB
);
//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit2", FruitReducer.class, job);
//设置Reduce数量,最少1个
job.setNumReduceTasks(1);
//提交
boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
int status = isSuccess ? 0 : 1;
System.exit(status);
}
}
测试
- 打jar包并上传到Linux
- 执行jar包
- 查看结果