您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

整合Kafka消费binlog消息

宝哥大数据 发布时间:2021-03-07 11:14:40 ,浏览量:1

在抽象类MySqlBaseETL中,实现Flink整合Kafka。

操作步骤:

1、自定义ProtoBuf反序列化

  • 因为Canal采集到的数据是以ProtoBuf形式推入到Kafka中的,故应该使用ProtoBuf来进行反序列化

2、Flink整合Kafka

3、创建订单实时etl处理类

4、编写App测试

自定义ProtoBuf反序列化

反序列化主要是将Byte数组转换为之前封装在common工程的RowData类

在com.chb.shop.realtime.utils包下创建CanalRowDataDeserializationSchema实现类

import com.chb.canal.bean.RowData
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema

/**
  * 自定义ProtoBuf反序列化
  */
class CanalRowDataDeserializationSchema extends AbstractDeserializationSchema[RowData]{
  override def deserialize(bytes: Array[Byte]): RowData = {
    new RowData(bytes)
  }
}
定义MySQL消息etl抽象类

后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。

在etl包下创建MySqlBaseETL:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * 消费canal-kafka中的数据
 * @param env
 */
abstract class MySqlBaseETL(env:StreamExecutionEnvironment) extends BaseETL[RowData] {

  //获取到canal中消费出来的RowData对象
  override def getKafkaDataStream(topic:String = GlobalConfigUtil.`input.topic.canal`): DataStream[RowData] = {
    // 获取canal中的数据
    val canalKafkaConsumer: FlinkKafkaConsumer011[RowData] = new FlinkKafkaConsumer011[RowData](
      topic,
      new CanalRowDataDeserializationSchema(),
      KafkaProp.getProperties()
    )

    // 添加source到DataStream中
    val canalRowDataDS: DataStream[bean.RowData] = env.addSource(canalKafkaConsumer)
    canalRowDataDS
  }
}
创建订单实时etl处理类

在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 订单实时ETL处理
  * @param env Flink运行环境
  */
class OrderETL(env:StreamExecutionEnvironment) extends MySqlBaseETL(env) {
  override def process(): Unit = {
    kafkaDS.print()
  }
}
编写App测试
  • 创建OrderETL实例对象,并调用process方法执行测试
  • 启动canal-client客户端程序订阅binlog消息
val orderETL = new OrderETL(env)
orderETL.process()

在mysql中修改一条数据,确保Flink能够消费并打印ProtoBuf消息:

{"columns":{"bankId":"25","createTime":"2019-12-04 10:00:41","bankName":"杭州发展银行","dataFlag":"1"},"eventType":"update","executeTime":1575442948000,"logfilename":"mysql-bin.000025","logfileoffset":51945936,"tableName":"chb_banks"}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0395s