您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

整合Kafka消费评论消息

宝哥大数据 发布时间:2021-03-07 11:17:47 ,浏览量:0

在application.conf中添加配置
# kafka 评论 topic名称
input.topic.comments="ods_chb_shop_comments"
读取配置
val `input.topic.comments` = config.getString("input.topic.comments")
创建CommentsETL类,整合Kafka
  • 在etl包下创建CommentsETL,从MQBaseETL继承
  • 实现process方法,并测试打印消费数据
/**
  * 点击流处理逻辑
  */
class CommentsETL(env:StreamExecutionEnvironment) extends BaseETL[String] {

  /**
   * 业务处理接口
   */
  override def process(): Unit = {
    // 1. 整合Kafka
    val commentsDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.comments`)
    commentsDS.print()
  }
}
修改App测试

1、在App对象中创建ClickLogETL对象,并调用process方法

    val commentsETL = new CommentsETL(env)
    commentsETL.process()

2、在Kafka中创建ods层评论topic

bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_comments --replication-factor 3 --partitions 3 

3、在Kafka中启动控制台生产程序

bin/kafka-console-producer.sh --broker-list node1:9092 --topic ods_chb_shop_comments

4、启动Flink测试

5、在控制台生产程序中贴入以下消息测试Flink是否能够消费成功

{"comments":"外观不错,音效不错,性价比高,值得购买的一款机器","goodsId":112575,"imageViedoJSON":"[\"chb.com/t1/99554/6/1122/267221/5dba725bE3a436c24/434bf88bc0a2a108.jpg\"]","orderGoodsId":"478845","starScore":1,"timestamp":1577091729997,"userId":"100719","userName":"商区苏"}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0387s