您当前的位置: 首页 >  hbase

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

HBASE结合MapReduce批量导入

宝哥大数据 发布时间:2017-12-30 20:59:25 ,浏览量:0

版本信息:

hbase1.2.2
hadoop2.5.2
案例: 详单入库, 字段及类型如下:

这里写图片描述

HBASE表定义为:
create 'wlan_log', 'cf'
RowKey设计: msisdn:日期时间串(yyyyMMddHHmmss) 数据样本

这里写图片描述

二、数据准备, 表的创建 2.1、将数据上传到hdfs上的input目录
hdfs dfs -mkdir /input
hdfs dfs -put tmp.data /input/
2.2、在hbase中创建表 , 表名wlan_log, 列族cf.
create 'wlan_log', 'cf'
三、HBASE结合MapReduce批量导入程序 3.1、Map 读取hdfs中的数据,拼接rowkwy
package com.chb.xdrToHBase;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

public  class BatchImportMapper extends Mapper{
    SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
    Text v2 = new Text();

    protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
        //拆分字段
        final String[] splited = value.toString().split("\t");
        try {
            //格式化reportTime
            final Date date = new Date(Long.parseLong(splited[0].trim()));
            final String dateFormat = dateformat1.format(date);
            //构建rowkey --> msisdn:yyyyMMddHHmmss
            String rowKey = splited[1]+":"+dateFormat;
            v2.set(rowKey+"\t"+value.toString());
            //map输出: 
            context.write(key, v2);
        } catch (NumberFormatException e) {
            final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
            counter.increment(1L);
            System.out.println("出错了"+splited[0]+" "+e.getMessage());
        }
    };
}
3.2、Reducer, 继承 org.apache.hadoop.hbase.mapreduce.TableReducer, 3.2.1、TableReducer ,这个类在hbase-serverxxx.jar中,需要映入。 因为要将数据输出到hbase表中, 所以提供一种TableReducer
package com.chb.xdrToHBase;

import org.apache.hadoop.hbase.client.Put;
//hbase-server-1.1.3.jar
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class BatchImportReducer extends TableReducer{
    protected void reduce(LongWritable key, java.lang.Iterable values,    Context context) throws java.io.IOException ,InterruptedException {
        for (Text text : values) {
            final String[] splited = text.toString().split("\t");
            //rowkey中已经包含了msisdn, reportTime
            final Put put = new Put(Bytes.toBytes(splited[0]));
            //对应各个列
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("acmac"), Bytes.toBytes(splited[2]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("apmac"), Bytes.toBytes(splited[3]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("host"), Bytes.toBytes(splited[4]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("siteType"), Bytes.toBytes(splited[5]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("upPackNum"), Bytes.toBytes(splited[6]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("downPackNum"), Bytes.toBytes(splited[7]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("upPayLoad"), Bytes.toBytes(splited[8]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("downPayLoad"), Bytes.toBytes(splited[9]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("httpStatus"), Bytes.toBytes(splited[10]));

            context.write(NullWritable.get(), put);
        }
    };
}
3.3、主函数 3.3.1、设置hbase参数
        final Configuration configuration = new Configuration();
        //设置zookeeper
        configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2");
        //设置hbase表名称
        configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
        //将该值改大,防止hbase超时退出
        configuration.set("dfs.socket.timeout", "180000");
3.3.2、创建Job, 通过TableMapReduceUtil添加依赖及配置
        final Job job = Job.getInstance(configuration,"HBaseBatchImport");
        TableMapReduceUtil.addDependencyJars(job);
        job.setJarByClass(Main.class);
3.3.3、设置输入,输出路径及类
        //设置mapper, reducer
        job.setMapperClass(BatchImportMapper.class);
        job.setReducerClass(BatchImportReducer.class);
        //设置map的输出,不设置reduce的输出类型
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        //不再设置输出路径,而是设置输出格式类型
        job.setOutputFormatClass(TableOutputFormat.class);

        //设置输入路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9000/input");

        job.waitForCompletion(true);
3.4、运行 3.4.1、在windows的Eclipse中直接运行 报错: Error generating shuffle secret key, HmacSHA1 KeyGenerator not available 从网上查的是jre路径有问题, 应该是jdk下的jre, 按照修改, 没有作用 3.4.2、打包上传到linux服务器上运行

在pom.xml中添加


    
    
        
            
                org.apache.maven.plugins
                
                maven-assembly-plugin
                2.6
                
                    1.8
                    1.8
                    
                        
                            com.chb.xdrToHBase.Main
                        
                    
                    
                        jar-with-dependencies
                    
                
            
        
    
3.4.2.1、打包: clean package assembly:single, 注意:不要在前面添加mvn, 因为Eclipse中默认会添加。 这种方式打包,会将依赖都打入一个包中, 可以直接运行。这里写图片描述 3.4.2.2、上传,执行
hadoop jar HBaseTestPom-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.chb.xdrToHBase.Main
有个小问题: 第一次打包执行没有问题, 修改程序后,再次打包总是报错Exception in thread "main" java.lang.ClassNotFoundException: com.chb.xdrToHBase.Main 解决办法: project–>clean 3.5、在hbase中查询结果

这里写图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0435s