与访客的 dws 层的宽表类似,也是把多个事实表的明细数据汇总起来组合成宽表
➢ 从 Kafka 主题中获得数据流 ➢ 把 Json 字符串数据流转换为统一数据对象的数据流 ➢ 把统一的数据结构流合并为一个流 ➢ 设定事件时间与水位线 ➢ 分组、开窗、聚合 ➢ 写入 ClickHouse
二、功能实现 2.1、封装商品统计实体类 ProductStats 2.2、ProductStatsApp 2.2.1、从 Kafka 主题中获得数据流 2.2.2、把 JSON 字符串数据流转换为统一数据对象的数据流 2.2.3、把统一的数据结构流合并为一个流 2.2.4、设定事件时间与水位线 2.2.5、分组、开窗、聚合 2.2.6、补充商品维度信息 2.3、写入 ClickHouse 2.3.1、在 ClickHouse 中创建商品主题宽表create table product_stats (
stt DateTime,
edt DateTime,
sku_id UInt64,
sku_name String,
sku_price Decimal64(2),
spu_id UInt64,
spu_name String ,
tm_id UInt64,
tm_name String,
category3_id UInt64,
category3_name String ,
display_ct UInt64,
click_ct UInt64,
favor_ct UInt64,
cart_ct UInt64,
order_sku_num UInt64,
order_amount Decimal64(2),
order_ct UInt64 ,
payment_amount Decimal64(2),
paid_order_ct UInt64,
refund_order_ct UInt64,
refund_amount Decimal64(2),
comment_ct UInt64,
good_comment_ct UInt64 ,
ts UInt64
)engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by (stt,edt,sku_id );
2.3.2、为主程序增加写入 ClickHouse 的 Sink
三、整体测试
➢ 启动 ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell ➢ 运行 BaseLogApp ➢ 运行 BaseDBApp ➢ 运行 OrderWideApp ➢ 运行 PaymentWideApp ➢ 运行 ProductsStatsApp ➢ 运行 rt_applog 目录下的 jar 包 ➢ 运行 rt_dblog 目录下的 jar 包 ➢ 查看控制台输出 ➢ 查看 ClickHouse 中 products_stats 表数据
注意:一定要匹配两个数据生成模拟器的日期,否则窗口无法匹配上