您当前的位置: 首页 >  hbase

宝哥大数据

暂无认证

  • 3浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

HBase的Bulk Loading---实战01

宝哥大数据 发布时间:2017-04-20 18:25:09 ,浏览量:3

Bulk Loading 一、生成hfile, 注意key 必须是有序的。 1.1、使用HFIleWriter将数据生成hfile
//在hdfsFilePath 下创建一个column family的子目录
                Path familydir = new Path(hdfsFilePath, Bytes.toString(family));
                //blukload将数据直接写到storeFile中,
                HFileContext hfileContext = new HFileContext();
                if (!System.getProperty("os.name").startsWith("Window")) {
                    //在本地测试没有使用snappy压缩, 在linux的测试集群中,使用了snappy压缩
                    //Logger.businessLogger.info("使用snappy压缩");
                    hfileContext.setCompression(Compression.Algorithm.SNAPPY);
                }
                Collections.sort(dataList);
                KeyValue kv = null;
                //通过HFileWriter写入
                StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), HDFSUtil.hdfs)
                .withOutputDir(familydir)
                .withBloomType(BloomType.NONE)
                .withComparator(KeyValue.COMPARATOR)
                .withFileContext(hfileContext).build();
1.2、数据的key进行排序(这部分最重要)
boolean writeOk = false;

                try{
                    String lastRow = "";
                    int lastRowIdx = 1;
                    for (int i = 0; i < dataSize; i++) {
                        //一条数据
                        String str = dataList.get(i);
                        int inx = str.indexOf('|');
                        if(inx != -1){
                            rowKey = str.substring(0, inx);
                            if(rowKey.length() > 32767){
                                Logger.businessLogger.error("cloud-" + CloudConstants.version + ":Row > 32767:" + localFilePath + " str-" + str);
                                continue;
                            }

                            //当前行和前一行rowkey比较
                            if(rowKey.equals(lastRow)){//防止rowkey重复
                                rowKey = rowKey + "_r" + lastRowIdx;
                            }else{
                                lastRowIdx ++;
                                lastRow = rowKey;
                            }

                            kv = new KeyValue(
                                    Bytes.toBytes(rowKey), 
                                    family, 
                                    qualifier, 
                                    System.currentTimeMillis(), 
                                    KeyValue.Type.Put, 
                                    Bytes.toBytes(str.substring(inx + 1, str.length())));
                            try{
                                writer.append(kv);
                            }catch(Exception e){
                                //Logger.businessLogger.error("cloud-" + CloudConstants.version + ":ferr key:" + rowKey);
                                errDataLog(str);
                            }
                        }else{//该行数据没有rowkey, 数据有问题
                            Logger.businessLogger.error("cloud-" + CloudConstants.version + ":ferr filePath:" + localFilePath + " str-" + str);
                        }
                    }
                    writeOk = true;
                }catch(Exception e){
                    Logger.businessLogger.error("cloud-" + CloudConstants.version + ":whdfs filePath:" + localFilePath);
                    e.printStackTrace();
                }
                writer.close();
1.3、生成好了hfile, 需要使用LoadIncrementalHFiles将临时目录的hfile文件加载到hbase中。
    boolean blukloadOk = false;
                if(writeOk){
                    try {
                        Path hdfsDir = new Path(hdfsFilePath);
                        if(HDFSUtil.hdfs.exists(hdfsDir)){
                            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                            loader.doBulkLoad(hdfsDir, new HTable(conf, tableName));
                            blukloadOk = true;
                        }
                    } catch (Exception e) {
                        Logger.businessLogger.error("cloud-" + CloudConstants.version + ":load filePath:" + localFilePath + " " + e);
                    }
                }

                if(!blukloadOk){
                    return;
                }

                Logger.businessLogger.info("cloud-" + CloudConstants.version + ":" + tableName + "-" + "load filePath:" + localFilePath + " data-" + dataSize + " time-" + ((System.currentTimeMillis() - startTimeChild) / 1000f));
                System.gc();
问题 问题1、java.io.IOException: Added a key not lexically larger than previous key

这里写图片描述

原因: 在创建本地的hfile临时文件的时候, 为了数据量充足, 复制了多份数据,导致key重复 问题2、
java.util.concurrent.ExecutionException: org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file hdfs://testcluste
r/apps/hbase/data/data/hbaseCache/idc/10.222.7.127/idc_20180221_1_511680985/cf1/b47aaf1fabb5431aba2d377a28288dd1

这里写图片描述

问题3、Compression algorithm ‘snappy’ previously failed test.

原因:缺少snappy.so

create 'snappy',{NAME=>'t',COMPRESSION=>'SNAPPY'}  
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0709s