目录
1. 背景
- 1. 背景
- 2. mysql表结构和数据
- 3. Flink客户端代码
- 4. Clickhouse表结构和结果数据
因为Flink Table并不支持插入数据到Clickhouse, 所以通过转换成DataStream的方式,将数据插入到Clickhouse
2. mysql表结构和数据mysql>
mysql> use flink_test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show create table user;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user | CREATE TABLE `user` (
`id` bigint NOT NULL,
`name` varchar(128) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
mysql>
mysql> select * from user;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | yi | 1 |
| 2 | er | 2 |
| 3 | san | 3 |
+----+------+------+
3 rows in set (0.00 sec)
mysql>
3. Flink客户端代码
pom.xml需添加以下依赖
org.apache.flink
flink-connector-jdbc_2.11
1.14.0
provided
mysql
mysql-connector-java
8.0.25
provided
ru.yandex.clickhouse
clickhouse-jdbc
0.2.4
provided
执行TableSqlTest.scala程序
package TableApiTest
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import java.sql.PreparedStatement
object TableSqlTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
// .setRuntimeMode(RuntimeExecutionMode.BATCH)
val tEnv = StreamTableEnvironment.create(senv)
// 可以只映射name和age两列
val source_table =
"""
|create temporary table user(
| id bigint,
| name string,
| age int,
| primary key (id) not enforced
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://192.168.xxx.xxx:3306/flink_test',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = 'user',
| 'username' = 'root',
| 'password' = 'Root_123'
|)
|""".stripMargin
tEnv.executeSql(source_table)
val user = tEnv.from("user")
val user_DS = tEnv.toDataStream(user)
user_DS.addSink(JdbcSink.sink(
"insert into user_all(id, name, age) values(?, ?, ?)",
new JdbcStatementBuilder[Row] {
override def accept(t: PreparedStatement, u: Row): Unit = {
t.setLong(1, u.getFieldAs[Long](0))
t.setString(2, u.getFieldAs[String](1))
t.setInt(3, u.getFieldAs[Int](2))
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://192.168.xxx.xxx:8123/default")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("default")
.withPassword("default123")
.build()
))
senv.execute("TableSqlTest")
}
}
4. Clickhouse表结构和结果数据
表结构如下:
bigdata005 :)
bigdata005 :) create table user_local on cluster cluster_ha(
:-] id UInt32,
:-] name String,
:-] age UInt8
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/user/{shard}', '{replica}')
:-] order by id;
bigdata005 :)
bigdata005 :) create table user_all on cluster sharding_ha as user_local engine Distributed(sharding_ha, default, user_local, rand());
bigdata005 :)
查看结果数据如下:
bigdata005 :)
bigdata005 :) select * from user_all;
┌─id─┬─name─┬─age─┐
│ 1 │ yi │ 1 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│ 2 │ er │ 2 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│ 3 │ san │ 3 │
└────┴──────┴─────┘
bigdata005 :)