您当前的位置: 首页 >  sql

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

DWS 层-地区主题表(FlinkSQL)

宝哥大数据 发布时间:2021-03-23 20:49:20 ,浏览量:0

在这里插入图片描述

  地区主题主要是反映各个地区的销售情况。从业务逻辑上地区主题比起商品更加简单,业务逻辑也没有什么特别的就是做一次轻度聚合然后保存。

一、需求分析与思路

➢ 定义 Table 流环境 ➢ 把数据源定义为动态表 ➢ 通过 SQL 查询出结果表 ➢ 把结果表转换为数据流 ➢ 把数据流写入目标数据库   如果是Flink 官方支持的数据库,也可以直接把目标数据表定义为动态表,用 insert into写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、PostgreSQL、Derby)。也可以制作自定义 sink,实现官方不支持的连接器。但是比较繁琐。

二、功能实现

完整代码chb-realtime/com.chb.realtime.app.dws.ProvinceStatsSqlApp

注:本文基于Flink1.12 2.1、添加 FlinkSQL 相关依赖

 org.apache.flink
 flink-table-api-java-bridge_${scala.version}
 ${flink.version}


 org.apache.flink
 flink-table-planner-blink_${scala.version}
 ${flink.version}

2.2、定义 Table 流环境
        //1.4 创建Table环境
        EnvironmentSettings setting = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
            
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
2.3、把数据源定义为动态表

创建一个 kafka table

        //TODO 2.把数据源定义为动态表
        String groupId = "province_stats";
        String orderWideTopic = "dwm_order_wide";
        tableEnv.executeSql("CREATE TABLE ORDER_WIDE (" +
                "province_id BIGINT, " +
                "province_name STRING," +
                "province_area_code STRING," +
                "province_iso_code STRING," +
                "province_3166_2_code STRING," +
                "order_id STRING, " +
                "split_total_amount DOUBLE," +
                "create_time STRING,rowtime AS TO_TIMESTAMP(create_time) ," +
                "WATERMARK FOR rowtime  AS rowtime) WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic, groupId) + ")");

为了通用性将 WITH 后面的属性配置封装到MyKafkaUtil.getKafkaDDL中 kafka connector

    /**
     * 拼接Kafka相关属性到DDL
     *
     * @param topic
     * @param groupId
     * @return
     */
    public static String getKafkaDDL(String topic, String groupId) {
        String ddl = "'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                "  'format' = 'json', " +
                "  'scan.startup.mode' = 'latest-offset'  ";
        return ddl;
    }

2.4、聚合计算
        //TODO 3.聚合计算
        // 开窗 group by TUMBLE(rowtime, INTERVAL '10' SECOND )
        // 开窗开始,结束时间 TUMBLE_START  TUMBLE_END
        Table provinceStateTable = tableEnv.sqlQuery("SELECT " +
            "DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt, " +
            "DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , " +
            " province_id,province_name,province_area_code area_code," +
            "province_iso_code iso_code ,province_3166_2_code iso_3166_2 ," +
            "COUNT( DISTINCT  order_id) order_count, sum(split_total_amount) order_amount," +
            "UNIX_TIMESTAMP() * 1000 ts "+
            " from  ORDER_WIDE group by  TUMBLE(rowtime, INTERVAL '10' SECOND )," +
            " province_id,province_name,province_area_code,province_iso_code,province_3166_2_code ");

2.5、转为数据流
        //TODO 4.将动态表转换为数据流
        DataStream provinceStatsDS = tableEnv.toAppendStream(provinceStateTable, ProvinceStats.class);
        //DataStream provinceStatsDS = tableEnv.toRetractStream(provinceStateTable, ProvinceStats.class);

2.6、写入 ClickHouse 2.6.1、在ClickHouse创建主题宽表
create table province_stats (
 stt DateTime,
 edt DateTime,
 province_id UInt64,
 province_name String,
  area_code String ,
 iso_code String,
 iso_3166_2 String ,
 order_amount Decimal64(2),
 order_count UInt64 ,
 ts UInt64
)engine =ReplacingMergeTree( ts)
 partition by toYYYYMMDD(stt)
 order by (stt,edt,province_id );
2.6.2、保存到clickhouse
        //TODO 5.将流中的数据保存到ClickHouse
         provinceStatsDS.addSink(
            ClickHouseUtil.getJdbcSink(
                "insert into  province_stats values(?,?,?,?,?,?,?,?,?,?)"
            )
        );
2.7、整体测试

➢ 启动 ZK、Kafka、ClickHouse、Redis、HDFS、Hbase、Maxwell ➢ 运行 BaseDBApp ➢ 运行 OrderWideApp ➢ 运行 ProvinceStatsSqlApp ➢ 运行 rt_dblog 目录下的 jar 包 ➢ 查看控制台输出 ➢ 查看 ClickHouse 中 products_stats 表数据

注意:因为是事件时间,所以第一次运行 rt_dblog 的时候,不会触发 watermark,第二次 再运行 rt_dblog 的 jar 的时候,才会触发第一次运行的 watermark

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0384s