您当前的位置: 首页 >  sql

梁云亮

暂无认证

  • 2浏览

    0关注

    1211博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flume准实时采集MySQL数据

梁云亮 发布时间:2020-03-17 13:59:52 ,浏览量:2

需求

利用Flume将MySQL表数据准实时抽取到HDFS

-软件版本 Flume:1.9.0 MySQL:5.7

  • db_test下面有tb_dept表作为Flume的source:
CREATE TABLE `tb_dept`  (
  `deptno` tinyint(2) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '部门编号',
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部门名称',
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '部门地址',
  PRIMARY KEY (`deptno`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `tb_dept` VALUES (1, 'ACCOUNTING', 'NewYork');
INSERT INTO `tb_dept` VALUES (2, 'RESEARCH', 'Boston');
INSERT INTO `tb_dept` VALUES (3, 'SALES', 'Dallas');
INSERT INTO `tb_dept` VALUES (4, 'OPERATIONS', 'Chicago');

准备工作 第一步:在MySQL的db_test下面创建元数据表flume_meta
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
第二步:在flume根目录下的job文件中创建配置文件flume-mysql.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.hc.SQLSource  
a1.sources.r1.connection.url = jdbc:mysql://hcmaster:3306/db_test
a1.sources.r1.connection.user = root  
a1.sources.r1.connection.password = root  
a1.sources.r1.table = tb_dept  
a1.sources.r1.columns.to.select = *  
#a1.sources.r1.incremental.column.name = deptno  
#a1.sources.r1.incremental.value = 0 
a1.sources.r1.run.query.delay=5000

# Describe the sink
a1.sinks.k1.type = logger

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
编写代码 第一步:创建Maven项目

    junit
    junit
    4.12
    test



    org.slf4j
    slf4j-api
    1.7.25


    org.slf4j
    slf4j-log4j12
    1.7.25



    mysql
    mysql-connector-java
    5.1.45



    org.apache.flume
    flume-ng-core
    1.9.0


    org.apache.flume
    flume-ng-sdk
    1.9.0



    org.apache.flume
    flume-ng-configuration
    1.9.0

添加MySQL配置文件、日志配置文件
  • mysql.properties

    driver=com.mysql.jdbc.Driver
    url=jdbc:mysql://hcmaster:3306/db_test?useSSL=false&serverTimezone=GMT%2B8&characterEncoding=utf8&useUnicode=true&allowPublicKeyRetrieval=true
    user=root
    password=root
    
  • log4j.properties

    log4j.rootLogger=info,myconsole,myfile
    log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
    log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
    #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
    
    #log4j.rootLogger=error,myfile
    log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.myfile.File=/tmp/flume.log
    log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
    
编写SQLSourceHelper
public class SQLSourceHelper {
    private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
    private int runQueryDelay;  //两次查询的时间间隔
    private int startFrom;             //查询语句开始id
    private int currentIndex;             //查询语句当前id,每次查询之前需要查元数据表
    private int recordSixe = 0;       //每次查询返回结果的条数
    private String table;        //要操作的表
    private String columnsToSelect;      //用户传入的查询的列
    private String customQuery;           //用户传入的查询语句
    private String query;               //构建的查询语句
    private String defaultCharsetResultSet;//编码集

    private Context context;    //上下文,用来获取配置文件

    //为定义的变量赋值(默认值),可在flume任务的配置文件中修改
    private static final int DEFAULT_QUERY_DELAY = 10000;
    private static final int DEFAULT_START_VALUE = 0;
    private static final String DEFAULT_COLUMNS_SELECT = "*";
    private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
    private static Connection conn = null;
    private static PreparedStatement ps = null;
    private static String connectionURL, connectionUserName, connectionPassword;

    //加载静态资源
    static {
        Properties p = new Properties();
        try {
            p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("mysql.properties"));
            connectionURL = p.getProperty("url");
            connectionUserName = p.getProperty("user");
            connectionPassword = p.getProperty("password");
            Class.forName(p.getProperty("driver"));
        } catch (IOException | ClassNotFoundException e) {
            LOG.error(e.toString());
        }
    }

    //获取JDBC连接
    private static Connection getConnection(String url, String user, String pw) {
        try {
            Connection conn = DriverManager.getConnection(url, user, pw);
            if (conn == null) {
                throw new SQLException();
            }
            return conn;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    //构造方法
    SQLSourceHelper(Context context) throws ParseException {
        //初始化上下文
        this.context = context;
        //有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
        this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
        this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
        this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
        this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
        //无默认值参数:获取flume任务配置文件中的参数
        this.table = context.getString("table");
        this.customQuery = context.getString("custom.query");
        connectionURL = context.getString("connection.url");
        connectionUserName = context.getString("connection.user");
        connectionPassword = context.getString("connection.password");
        conn = getConnection(connectionURL, connectionUserName, connectionPassword);
        //校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
        try {
            checkMandatoryProperties();
        } catch (ConfigurationException e) {
            e.printStackTrace();
        }
        //获取当前的id
        currentIndex = getStatusDBIndex(startFrom);
        //构建查询语句
        query = buildQuery();
    }

    //校验相应的配置信息(表,查询语句以及数据库连接的参数)
    private void checkMandatoryProperties() throws ConfigurationException {
        if (table == null) {
            throw new ConfigurationException("property table not set");
        }
        if (connectionURL == null) {
            throw new ConfigurationException("connection.url property not set");
        }
        if (connectionUserName == null) {
            throw new ConfigurationException("connection.user property not set");
        }
        if (connectionPassword == null) {
            throw new ConfigurationException("connection.password property not set");
        }
    }

    //构建sql语句
    private String buildQuery() {
        String sql = "";
        //获取当前id
        currentIndex = getStatusDBIndex(startFrom);
        LOG.info(currentIndex + "");
        if (customQuery == null) {
            sql = "SELECT " + columnsToSelect + " FROM " + table;
        } else {
            sql = customQuery;
        }
        StringBuilder execSql = new StringBuilder(sql);
        //以id作为offset
        if (!sql.contains("where")) {
            execSql.append(" where ");
            execSql.append("deptno").append(">").append(currentIndex);
            return execSql.toString();
        } else {
            int length = execSql.toString().length();
            return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
        }
    }

    //执行查询
    List executeQuery() {
        try {
            //每次执行查询时都要重新生成sql,因为id不同
            customQuery = buildQuery();
            //存放结果的集合
            List results = new ArrayList();
            if (ps == null) {
                //
                ps = conn.prepareStatement(customQuery);
            }
            ResultSet result = ps.executeQuery(customQuery);
            while (result.next()) {
                //存放一条数据的集合(多个列)
                List row = new ArrayList();
                //将返回结果放入集合
                for (int i = 1; i 4

查看slume_meta元数据表: 在这里插入图片描述 往tb_dept表中插入一条数据: 在这里插入图片描述 再次查看slume_meta元数据表: 在这里插入图片描述

关注
打赏
1665409997
查看更多评论
0.1137s