版本信息:
hbase1.2.2
hadoop2.5.2
案例: 详单入库, 字段及类型如下:
create 'wlan_log', 'cf'
RowKey设计:
msisdn:日期时间串(yyyyMMddHHmmss)
数据样本
hdfs dfs -mkdir /input
hdfs dfs -put tmp.data /input/
2.2、在hbase中创建表 , 表名wlan_log, 列族cf.
create 'wlan_log', 'cf'
三、HBASE结合MapReduce批量导入程序
3.1、Map
读取hdfs中的数据,拼接rowkwy
package com.chb.xdrToHBase;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
public class BatchImportMapper extends Mapper{
SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
Text v2 = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
//拆分字段
final String[] splited = value.toString().split("\t");
try {
//格式化reportTime
final Date date = new Date(Long.parseLong(splited[0].trim()));
final String dateFormat = dateformat1.format(date);
//构建rowkey --> msisdn:yyyyMMddHHmmss
String rowKey = splited[1]+":"+dateFormat;
v2.set(rowKey+"\t"+value.toString());
//map输出:
context.write(key, v2);
} catch (NumberFormatException e) {
final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
counter.increment(1L);
System.out.println("出错了"+splited[0]+" "+e.getMessage());
}
};
}
3.2、Reducer, 继承 org.apache.hadoop.hbase.mapreduce.TableReducer
,
3.2.1、TableReducer ,这个类在hbase-serverxxx.jar中,需要映入。 因为要将数据输出到hbase表中, 所以提供一种TableReducer
package com.chb.xdrToHBase;
import org.apache.hadoop.hbase.client.Put;
//hbase-server-1.1.3.jar
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
public class BatchImportReducer extends TableReducer{
protected void reduce(LongWritable key, java.lang.Iterable values, Context context) throws java.io.IOException ,InterruptedException {
for (Text text : values) {
final String[] splited = text.toString().split("\t");
//rowkey中已经包含了msisdn, reportTime
final Put put = new Put(Bytes.toBytes(splited[0]));
//对应各个列
put.add(Bytes.toBytes("cf"), Bytes.toBytes("acmac"), Bytes.toBytes(splited[2]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("apmac"), Bytes.toBytes(splited[3]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("host"), Bytes.toBytes(splited[4]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("siteType"), Bytes.toBytes(splited[5]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("upPackNum"), Bytes.toBytes(splited[6]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("downPackNum"), Bytes.toBytes(splited[7]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("upPayLoad"), Bytes.toBytes(splited[8]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("downPayLoad"), Bytes.toBytes(splited[9]));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("httpStatus"), Bytes.toBytes(splited[10]));
context.write(NullWritable.get(), put);
}
};
}
3.3、主函数
3.3.1、设置hbase参数
final Configuration configuration = new Configuration();
//设置zookeeper
configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2");
//设置hbase表名称
configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
//将该值改大,防止hbase超时退出
configuration.set("dfs.socket.timeout", "180000");
3.3.2、创建Job, 通过TableMapReduceUtil添加依赖及配置
final Job job = Job.getInstance(configuration,"HBaseBatchImport");
TableMapReduceUtil.addDependencyJars(job);
job.setJarByClass(Main.class);
3.3.3、设置输入,输出路径及类
//设置mapper, reducer
job.setMapperClass(BatchImportMapper.class);
job.setReducerClass(BatchImportReducer.class);
//设置map的输出,不设置reduce的输出类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
//不再设置输出路径,而是设置输出格式类型
job.setOutputFormatClass(TableOutputFormat.class);
//设置输入路径
FileInputFormat.setInputPaths(job, "hdfs://master:9000/input");
job.waitForCompletion(true);
3.4、运行
3.4.1、在windows的Eclipse中直接运行
报错: Error generating shuffle secret key
, HmacSHA1 KeyGenerator not available
从网上查的是jre路径有问题, 应该是jdk下的jre, 按照修改, 没有作用
3.4.2、打包上传到linux服务器上运行
在pom.xml中添加
org.apache.maven.plugins
maven-assembly-plugin
2.6
1.8
1.8
com.chb.xdrToHBase.Main
jar-with-dependencies
3.4.2.1、打包: clean package assembly:single
, 注意:不要在前面添加mvn, 因为Eclipse中默认会添加。
这种方式打包,会将依赖都打入一个包中, 可以直接运行。hadoop jar HBaseTestPom-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.chb.xdrToHBase.Main
有个小问题: 第一次打包执行没有问题, 修改程序后,再次打包总是报错Exception in thread "main" java.lang.ClassNotFoundException: com.chb.xdrToHBase.Main
解决办法: project–>clean
3.5、在hbase中查询结果