准备工作
- 在HDFS的/input目录下创建一个名为fruit.tsv的文件,内容如下:
- 在HBase中创建一个名为fruit的表,如下:
public class FruitMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//从HDFS中读取的数据
String lineValue = value.toString();
//读取出来的每行数据使用\t进行分割,存于String数组
String[] values = lineValue.split("\t");
//根据数据中值的含义取值
String rowKey = values[0];
String name = values[1];
String color = values[2];
//初始化rowKey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化put对象
Put put = new Put(Bytes.toBytes(rowKey));
//参数分别:列族、列、值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
//写数据
context.write(rowKeyWritable, put); //ImmutableBytesWritable类型一般作为RowKey的类型;
}
}
创建Reducer
public class FruitReducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
创建Runner
public class FruitRunner {
public static void main(String[] arg) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] args = {"hdfs://hcmaster:8020/input/fruit.tsv"};
//创建Job任务
Job job = Job.getInstance(conf, FruitRunner.class.getSimpleName());
//设置主类
job.setJarByClass(FruitRunner.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置Mapper
job.setMapperClass(FruitMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit", FruitReducer.class, job);
//设置Reduce数量,最少1个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
int status = isSuccess ? 0 : 1;
System.exit(status);
}
}
运行
- 打jar包并上传到Linux
- 执行jar包
3.查看结果