定义MQBase消息etl抽象类
后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。
/**
* 消费kafka中的数据
* @param env
*/
abstract class MQBaseETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 获取数据源
* @return
*/
override def getKafkaDataStream(topic:String): DataStream[String] = {
// 获取cart中的数据
val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](
topic,
new SimpleStringSchema(),
KafkaProp.getProperties()
)
//将消费者添加到数据源
val jsonDataStream: DataStream[String] = env.addSource(kafkaConsumer)
jsonDataStream
}
}
整合Kafka消费购物车消息
在application.conf中添加配置
# Kafka 购物车 topic名称
input.topic.cart="ods_chb_cart"
读取配置
val `input.topic.cart` = config.getString("input.topic.cart")
创建CartETL类,整合Kafka
- 在etl包下创建CartETL,从MQBaseETL继承
- 实现process方法
- 实现getKafkaDS方法,在该方法中整合Kafka,并测试打印消费数据
/**
* 购物车实时ETL处理
* @param env Flink执行环境
*/
class CartETL(env: StreamExecutionEnvironment) extends MQBaseETL(env) {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合Kafka
val cartDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.cart`)
cartDS.print()
}
}
修改App测试
1、在App对象中创建CartETL对象,并调用process方法
val cartETL = new CartETL(env)
cartETL.process()
2、在Kafka中创建ods层购物车topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_cart --replication-factor 3 --partitions 3
3、在Kafka中启动控制台生产程序
bin/kafka-console-producer.sh --broker-list node1:9092 --topic ods_chb_shop_cart
4、启动Flink测试
5、在控制台生产程序中贴入以下消息测试Flink是否能够消费成功
{"addTime":"Mon Dec 16 18:01:41 CST 2019","count":1,"goodsId":"100106","guid":"fd3e6beb-2ce3-4eda-bcaa-bc6221f85016","ip":"123.125.71.102","userId":"100208"}