您当前的位置: 首页 >  kafka

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

整合kafka消费字符串类型消息

宝哥大数据 发布时间:2021-03-07 11:16:25 ,浏览量:0

定义MQBase消息etl抽象类

后面不少的业务逻辑(购物车、评论、点击流等)需要共用一份Kafka数据,抽取抽象类的目的是共用一个FlinkKafkaConsumer,因为后面创建FlinkKafkaConsumer整合Kafka需要使用到Flink流式运行环境,需要在主构造器中传入Flink流式运行环境。该ETL抽象类需要从BaseETL继承。

/**
 * 消费kafka中的数据
 * @param env
 */
abstract class MQBaseETL(env:StreamExecutionEnvironment)  extends BaseETL[String] {
  /**
   * 获取数据源
   * @return
   */
  override def getKafkaDataStream(topic:String): DataStream[String] = {
    // 获取cart中的数据
    val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](
      topic,
      new SimpleStringSchema(),
      KafkaProp.getProperties()
    )

    //将消费者添加到数据源
    val jsonDataStream: DataStream[String] = env.addSource(kafkaConsumer)
    jsonDataStream
  }
}
整合Kafka消费购物车消息 在application.conf中添加配置
# Kafka 购物车 topic名称
input.topic.cart="ods_chb_cart"
读取配置
val `input.topic.cart` = config.getString("input.topic.cart")
创建CartETL类,整合Kafka
  • 在etl包下创建CartETL,从MQBaseETL继承
  • 实现process方法
  • 实现getKafkaDS方法,在该方法中整合Kafka,并测试打印消费数据
/**
  * 购物车实时ETL处理
  * @param env Flink执行环境
  */
class CartETL(env: StreamExecutionEnvironment) extends MQBaseETL(env) {
  /**
    * 业务处理接口
    */
  override def process(): Unit = {
    // 1. 整合Kafka
    val cartDS: DataStream[String] = getKafkaDataStream(GlobalConfigUtil.`input.topic.cart`)
    cartDS.print()
  }
}
修改App测试

1、在App对象中创建CartETL对象,并调用process方法

val cartETL = new CartETL(env)
cartETL.process()

2、在Kafka中创建ods层购物车topic

bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_cart --replication-factor 3 --partitions 3 

3、在Kafka中启动控制台生产程序

bin/kafka-console-producer.sh --broker-list node1:9092 --topic ods_chb_shop_cart

4、启动Flink测试

5、在控制台生产程序中贴入以下消息测试Flink是否能够消费成功

{"addTime":"Mon Dec 16 18:01:41 CST 2019","count":1,"goodsId":"100106","guid":"fd3e6beb-2ce3-4eda-bcaa-bc6221f85016","ip":"123.125.71.102","userId":"100208"}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0478s