您当前的位置: 首页 >  sql

Bulut0907

暂无认证

  • 2浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink Table查询mysql数据转换成DataStream插入到Clickhouse

Bulut0907 发布时间:2022-05-16 09:11:08 ,浏览量:2

目录
  • 1. 背景
  • 2. mysql表结构和数据
  • 3. Flink客户端代码
  • 4. Clickhouse表结构和结果数据

1. 背景

因为Flink Table并不支持插入数据到Clickhouse, 所以通过转换成DataStream的方式,将数据插入到Clickhouse

2. mysql表结构和数据
mysql>
mysql> use flink_test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show create table user;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                         |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user | CREATE TABLE `user` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> 
mysql> select * from user;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | yi   |    1 |
|  2 | er   |    2 |
|  3 | san  |    3 |
+----+------+------+
3 rows in set (0.00 sec)

mysql> 
3. Flink客户端代码

pom.xml需添加以下依赖

        
            org.apache.flink
            flink-connector-jdbc_2.11
            1.14.0
            provided
        

        
            mysql
            mysql-connector-java
            8.0.25
            provided
        

        
            ru.yandex.clickhouse
            clickhouse-jdbc
            0.2.4
            provided
        

执行TableSqlTest.scala程序

package TableApiTest

import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

import java.sql.PreparedStatement

object TableSqlTest {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
      // .setRuntimeMode(RuntimeExecutionMode.BATCH)
    val tEnv = StreamTableEnvironment.create(senv)

    // 可以只映射name和age两列
    val source_table =
      """
        |create temporary table user(
        |  id bigint,
        |  name string,
        |  age int,
        |  primary key (id) not enforced
        |) with (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://192.168.xxx.xxx:3306/flink_test',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 'user',
        |   'username' = 'root',
        |   'password' = 'Root_123'
        |)
        |""".stripMargin
    tEnv.executeSql(source_table)
    val user = tEnv.from("user")

    val user_DS = tEnv.toDataStream(user)
    user_DS.addSink(JdbcSink.sink(
      "insert into user_all(id, name, age) values(?, ?, ?)",
      new JdbcStatementBuilder[Row] {
        override def accept(t: PreparedStatement, u: Row): Unit = {

          t.setLong(1, u.getFieldAs[Long](0))
          t.setString(2, u.getFieldAs[String](1))
          t.setInt(3, u.getFieldAs[Int](2))

        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:clickhouse://192.168.xxx.xxx:8123/default")
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUsername("default")
        .withPassword("default123")
        .build()
    ))

    senv.execute("TableSqlTest")

  }

}

4. Clickhouse表结构和结果数据

表结构如下:

bigdata005 :)
bigdata005 :) create table user_local on cluster cluster_ha(
:-] id UInt32,
:-] name String,
:-] age UInt8
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/user/{shard}', '{replica}')
:-] order by id;
bigdata005 :)
bigdata005 :) create table user_all on cluster sharding_ha as user_local engine Distributed(sharding_ha, default, user_local, rand());
bigdata005 :)

查看结果数据如下:

bigdata005 :)
bigdata005 :) select * from user_all;
┌─id─┬─name─┬─age─┐
│  1 │ yi   │   1 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  2 │ er   │   2 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  3 │ san  │   3 │
└────┴──────┴─────┘
bigdata005 :) 
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0483s