在application.conf中添加配置
# kafka 评论 topic名称
input.topic.comments="ods_chb_shop_comments"
读取配置
val `input.topic.comments` = config.getString("input.topic.comments")
创建CommentsETL类,整合Kafka
- 在etl包下创建CommentsETL,从MQBaseETL继承
- 实现process方法,并测试打印消费数据
/**
* 点击流处理逻辑
*/
class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 业务处理接口
*/
override def process(): Unit = {
// 1. 整合Kafka
val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`)
commentsDS.print()
}
}
修改App测试
1、在App对象中创建ClickLogETL对象,并调用process方法
val commentsETL = new CommentsETL(env)
commentsETL.process()
2、在Kafka中创建ods层评论topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_comments --replication-factor 3 --partitions 3
3、在Kafka中启动控制台生产程序
bin/kafka-console-producer.sh --broker-list node1:9092 --topic ods_chb_shop_comments
4、启动Flink测试
5、在控制台生产程序中贴入以下消息测试Flink是否能够消费成功
{"comments":"外观不错,音效不错,性价比高,值得购买的一款机器","goodsId":112575,"imageViedoJSON":"[\"chb.com/t1/99554/6/1122/267221/5dba725bE3a436c24/434bf88bc0a2a108.jpg\"]","orderGoodsId":"478845","starScore":1,"timestamp":1577091729997,"userId":"100719","userName":"商区苏"}