前言: Flink-1.10及以前,没有提供JDBC Sink, 使用自定义的Sink – 3.4、自定义Sink
Flink-1.11之后,官方connector提供了JDBC Sink
1、添加依赖
org.apache.flink
flink-connector-jdbc_2.11
1.12.3
注意该连接器目前还 不是 二进制发行版的一部分,如何在集群中运行请参考 这里。
已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
2、通过JdbcSink.sink(...)
创建一个JDBC Sink
val jdbcSink = JdbcSink.sink(
"insert into person (id, name, age) values (?, ?, ?)",
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
ps.setInt(1, u._1)
ps.setString(2, u._2)
ps.setInt(3, u._3)
}
},
JdbcExecutionOptions.builder().withBatchSize(50).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
.withUsername("root")
.withPassword("123456").build()
)
3、完整代码
package com.chb.flink.connectors.ds
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.sql.PreparedStatement
// Flink-1.11开始提供了JDBC Sink
object JdbcConnectorDemo {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val jdbcSink = JdbcSink.sink(
"insert into person (id, name, age) values (?, ?, ?)",
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
ps.setInt(1, u._1)
ps.setString(2, u._2)
ps.setInt(3, u._3)
}
},
JdbcExecutionOptions.builder().withBatchSize(50).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
.withUsername("root")
.withPassword("123456").build()
)
streamEnv.fromElements((1, "chb", 23), (2, "ling", 18))
.addSink(jdbcSink)
streamEnv.execute("jdbc sink")
}
}
关注我的公众号【宝哥大数据】, 更多干货