地区主题主要是反映各个地区的销售情况。从业务逻辑上地区主题比起商品更加简单,业务逻辑也没有什么特别的就是做一次轻度聚合然后保存。
一、需求分析与思路➢ 定义 Table 流环境 ➢ 把数据源定义为动态表 ➢ 通过 SQL 查询出结果表 ➢ 把结果表转换为数据流 ➢ 把数据流写入目标数据库 如果是Flink 官方支持的数据库,也可以直接把目标数据表定义为动态表,用 insert into写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、PostgreSQL、Derby)。也可以制作自定义 sink,实现官方不支持的连接器。但是比较繁琐。
二、功能实现完整代码chb-realtime/com.chb.realtime.app.dws.ProvinceStatsSqlApp
org.apache.flink
flink-table-api-java-bridge_${scala.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.version}
${flink.version}
2.2、定义 Table 流环境
//1.4 创建Table环境
EnvironmentSettings setting = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
2.3、把数据源定义为动态表
创建一个 kafka table
//TODO 2.把数据源定义为动态表
String groupId = "province_stats";
String orderWideTopic = "dwm_order_wide";
tableEnv.executeSql("CREATE TABLE ORDER_WIDE (" +
"province_id BIGINT, " +
"province_name STRING," +
"province_area_code STRING," +
"province_iso_code STRING," +
"province_3166_2_code STRING," +
"order_id STRING, " +
"split_total_amount DOUBLE," +
"create_time STRING,rowtime AS TO_TIMESTAMP(create_time) ," +
"WATERMARK FOR rowtime AS rowtime) WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic, groupId) + ")");
为了通用性将 WITH 后面的属性配置封装到MyKafkaUtil.getKafkaDDL
中 kafka connector
/**
* 拼接Kafka相关属性到DDL
*
* @param topic
* @param groupId
* @return
*/
public static String getKafkaDDL(String topic, String groupId) {
String ddl = "'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset' ";
return ddl;
}
2.4、聚合计算
//TODO 3.聚合计算
// 开窗 group by TUMBLE(rowtime, INTERVAL '10' SECOND )
// 开窗开始,结束时间 TUMBLE_START TUMBLE_END
Table provinceStateTable = tableEnv.sqlQuery("SELECT " +
"DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt, " +
"DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , " +
" province_id,province_name,province_area_code area_code," +
"province_iso_code iso_code ,province_3166_2_code iso_3166_2 ," +
"COUNT( DISTINCT order_id) order_count, sum(split_total_amount) order_amount," +
"UNIX_TIMESTAMP() * 1000 ts "+
" from ORDER_WIDE group by TUMBLE(rowtime, INTERVAL '10' SECOND )," +
" province_id,province_name,province_area_code,province_iso_code,province_3166_2_code ");
2.5、转为数据流
//TODO 4.将动态表转换为数据流
DataStream provinceStatsDS = tableEnv.toAppendStream(provinceStateTable, ProvinceStats.class);
//DataStream provinceStatsDS = tableEnv.toRetractStream(provinceStateTable, ProvinceStats.class);
2.6、写入 ClickHouse
2.6.1、在ClickHouse创建主题宽表
create table province_stats (
stt DateTime,
edt DateTime,
province_id UInt64,
province_name String,
area_code String ,
iso_code String,
iso_3166_2 String ,
order_amount Decimal64(2),
order_count UInt64 ,
ts UInt64
)engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,province_id );
2.6.2、保存到clickhouse
//TODO 5.将流中的数据保存到ClickHouse
provinceStatsDS.addSink(
ClickHouseUtil.getJdbcSink(
"insert into province_stats values(?,?,?,?,?,?,?,?,?,?)"
)
);
2.7、整体测试
➢ 启动 ZK、Kafka、ClickHouse、Redis、HDFS、Hbase、Maxwell ➢ 运行 BaseDBApp ➢ 运行 OrderWideApp ➢ 运行 ProvinceStatsSqlApp ➢ 运行 rt_dblog 目录下的 jar 包 ➢ 查看控制台输出 ➢ 查看 ClickHouse 中 products_stats 表数据
注意:因为是事件时间,所以第一次运行 rt_dblog 的时候,不会触发 watermark,第二次 再运行 rt_dblog 的 jar 的时候,才会触发第一次运行的 watermark