在抽象类MySqlBaseETL中,实现Flink整合Kafka。
操作步骤:
1、自定义ProtoBuf反序列化
- 因为Canal采集到的数据是以ProtoBuf形式推入到Kafka中的,故应该使用ProtoBuf来进行反序列化
2、Flink整合Kafka
3、创建订单实时etl处理类
4、编写App测试
自定义ProtoBuf反序列化反序列化主要是将Byte数组转换为之前封装在common工程的RowData类
在com.chb.shop.realtime.utils包下创建CanalRowDataDeserializationSchema实现类
import com.chb.canal.bean.RowData
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
/**
* 自定义ProtoBuf反序列化
*/
class CanalRowDataDeserializationSchema extends AbstractDeserializationSchema[RowData]{
override def deserialize(bytes: Array[Byte]): RowData = {
new RowData(bytes)
}
}
定义MySQL消息etl抽象类
后面不少的业务逻辑(订单、订单明细、商品等)需要共用一份Kafka数据(从一个topic中拉取),抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
在etl包下创建MySqlBaseETL:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 消费canal-kafka中的数据
* @param env
*/
abstract class MySqlBaseETL(env:StreamExecutionEnvironment) extends BaseETL[RowData] {
//获取到canal中消费出来的RowData对象
override def getKafkaDataStream(topic:String = GlobalConfigUtil.`input.topic.canal`): DataStream[RowData] = {
// 获取canal中的数据
val canalKafkaConsumer: FlinkKafkaConsumer011[RowData] = new FlinkKafkaConsumer011[RowData](
topic,
new CanalRowDataDeserializationSchema(),
KafkaProp.getProperties()
)
// 添加source到DataStream中
val canalRowDataDS: DataStream[bean.RowData] = env.addSource(canalKafkaConsumer)
canalRowDataDS
}
}
创建订单实时etl处理类
在etl包下创建OrderETL类,用于后续订单实时拉宽处理,此时只用来进行测试
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 订单实时ETL处理
* @param env Flink运行环境
*/
class OrderETL(env:StreamExecutionEnvironment) extends MySqlBaseETL(env) {
override def process(): Unit = {
kafkaDS.print()
}
}
编写App测试
- 创建OrderETL实例对象,并调用process方法执行测试
- 启动canal-client客户端程序订阅binlog消息
val orderETL = new OrderETL(env)
orderETL.process()
在mysql中修改一条数据,确保Flink能够消费并打印ProtoBuf消息:
{"columns":{"bankId":"25","createTime":"2019-12-04 10:00:41","bankName":"杭州发展银行","dataFlag":"1"},"eventType":"update","executeTime":1575442948000,"logfilename":"mysql-bin.000025","logfileoffset":51945936,"tableName":"chb_banks"}