您当前的位置: 首页 >  flink

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink-1.11开始提供了JDBC Sink

宝哥大数据 发布时间:2021-08-06 13:48:30 ,浏览量:0

前言: Flink-1.10及以前,没有提供JDBC Sink, 使用自定义的Sink – 3.4、自定义Sink

Flink-1.11之后,官方connector提供了JDBC Sink

1、添加依赖


  org.apache.flink
  flink-connector-jdbc_2.11
  1.12.3

注意该连接器目前还 不是 二进制发行版的一部分,如何在集群中运行请参考 这里。

已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。

2、通过JdbcSink.sink(...) 创建一个JDBC Sink

    val jdbcSink = JdbcSink.sink(
      "insert into person (id, name, age) values (?, ?, ?)",
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
          ps.setInt(1, u._1)
          ps.setString(2, u._2)
          ps.setInt(3, u._3)
        }
      },
      JdbcExecutionOptions.builder().withBatchSize(50).build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("com.mysql.jdbc.Driver")
        .withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
        .withUsername("root")
        .withPassword("123456").build()
    )

3、完整代码

package com.chb.flink.connectors.ds

import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import java.sql.PreparedStatement

// Flink-1.11开始提供了JDBC Sink
object JdbcConnectorDemo {
  def main(args: Array[String]): Unit = {
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    val jdbcSink = JdbcSink.sink(
      "insert into person (id, name, age) values (?, ?, ?)",
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, u: (Int, String, Int)): Unit = {
          ps.setInt(1, u._1)
          ps.setString(2, u._2)
          ps.setInt(3, u._3)
        }
      },
      JdbcExecutionOptions.builder().withBatchSize(50).build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("com.mysql.jdbc.Driver")
        .withUrl("jdbc:mysql://localhost/test?characterEncoding=utf8&useSSL=false")
        .withUsername("root")
        .withPassword("123456").build()
    )

    streamEnv.fromElements((1, "chb", 23), (2, "ling", 18))
      .addSink(jdbcSink)


    streamEnv.execute("jdbc sink")
  }
}

关注我的公众号【宝哥大数据】, 更多干货

在这里插入图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0419s