Bulk Loading
一、生成hfile, 注意key 必须是有序的。
1.1、使用HFIleWriter将数据生成hfile
//在hdfsFilePath 下创建一个column family的子目录
Path familydir = new Path(hdfsFilePath, Bytes.toString(family));
//blukload将数据直接写到storeFile中,
HFileContext hfileContext = new HFileContext();
if (!System.getProperty("os.name").startsWith("Window")) {
//在本地测试没有使用snappy压缩, 在linux的测试集群中,使用了snappy压缩
//Logger.businessLogger.info("使用snappy压缩");
hfileContext.setCompression(Compression.Algorithm.SNAPPY);
}
Collections.sort(dataList);
KeyValue kv = null;
//通过HFileWriter写入
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), HDFSUtil.hdfs)
.withOutputDir(familydir)
.withBloomType(BloomType.NONE)
.withComparator(KeyValue.COMPARATOR)
.withFileContext(hfileContext).build();
1.2、数据的key进行排序(这部分最重要)
boolean writeOk = false;
try{
String lastRow = "";
int lastRowIdx = 1;
for (int i = 0; i < dataSize; i++) {
//一条数据
String str = dataList.get(i);
int inx = str.indexOf('|');
if(inx != -1){
rowKey = str.substring(0, inx);
if(rowKey.length() > 32767){
Logger.businessLogger.error("cloud-" + CloudConstants.version + ":Row > 32767:" + localFilePath + " str-" + str);
continue;
}
//当前行和前一行rowkey比较
if(rowKey.equals(lastRow)){//防止rowkey重复
rowKey = rowKey + "_r" + lastRowIdx;
}else{
lastRowIdx ++;
lastRow = rowKey;
}
kv = new KeyValue(
Bytes.toBytes(rowKey),
family,
qualifier,
System.currentTimeMillis(),
KeyValue.Type.Put,
Bytes.toBytes(str.substring(inx + 1, str.length())));
try{
writer.append(kv);
}catch(Exception e){
//Logger.businessLogger.error("cloud-" + CloudConstants.version + ":ferr key:" + rowKey);
errDataLog(str);
}
}else{//该行数据没有rowkey, 数据有问题
Logger.businessLogger.error("cloud-" + CloudConstants.version + ":ferr filePath:" + localFilePath + " str-" + str);
}
}
writeOk = true;
}catch(Exception e){
Logger.businessLogger.error("cloud-" + CloudConstants.version + ":whdfs filePath:" + localFilePath);
e.printStackTrace();
}
writer.close();
1.3、生成好了hfile, 需要使用LoadIncrementalHFiles将临时目录的hfile文件加载到hbase中。
boolean blukloadOk = false;
if(writeOk){
try {
Path hdfsDir = new Path(hdfsFilePath);
if(HDFSUtil.hdfs.exists(hdfsDir)){
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(hdfsDir, new HTable(conf, tableName));
blukloadOk = true;
}
} catch (Exception e) {
Logger.businessLogger.error("cloud-" + CloudConstants.version + ":load filePath:" + localFilePath + " " + e);
}
}
if(!blukloadOk){
return;
}
Logger.businessLogger.info("cloud-" + CloudConstants.version + ":" + tableName + "-" + "load filePath:" + localFilePath + " data-" + dataSize + " time-" + ((System.currentTimeMillis() - startTimeChild) / 1000f));
System.gc();
问题
问题1、java.io.IOException: Added a key not lexically larger than previous key
java.util.concurrent.ExecutionException: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file hdfs://testcluste
r/apps/hbase/data/data/hbaseCache/idc/10.222.7.127/idc_20180221_1_511680985/cf1/b47aaf1fabb5431aba2d377a28288dd1
原因:缺少snappy.so
create 'snappy',{NAME=>'t',COMPRESSION=>'SNAPPY'}