返回上级 DWM层业务实现–订单宽表
1.3、订单和订单明细关联(双流 join)在 flink 中的流 join 大体分为两种,一种是基于时间窗口的 join(Time Windowed Join),比如 join、coGroup 等。另一种是基于状态缓存的 join(Temporal Table Join),比如 intervalJoin。 这里选用 intervalJoin,因为相比较窗口 join,intervalJoin 使用更简单,而且避免了应匹配的数据处于不同窗口的问题。intervalJoin 目前只有一个问题,就是还不支持 left join。 但是我们这里是订单主表与订单从表之间的关联不需要 left join,所以 intervalJoin 是较好的选择
1.3.1、指定事件时间字段 //TODO 4. 指定事件时间字段
//4.1 订单指定事件时间字段
SingleOutputStreamOperator orderInfoWithTsDS = orderInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
return orderInfo.getCreate_ts();
}
})
);
//4.2 订单明细指定事件时间字段
SingleOutputStreamOperator orderDetailWithTsDS = orderDetailDS.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
return orderDetail.getCreate_ts();
}
}
)
);
注意 下面的forBoundedOutOfOrderness的前面的泛型, 指的是数据流中的数据类型
//TODO 5.按照订单id进行分组 指定关联的key
KeyedStream orderInfoKeyedDS = orderInfoWithTsDS.keyBy(OrderInfo::getId);
KeyedStream orderDetailKeyedDS = orderDetailWithTsDS.keyBy(OrderDetail::getOrder_id);
1.3.3、订单和订单明细关联 intervalJoin
//TODO 6.使用intervalJoin对订单和订单明细进行关联
SingleOutputStreamOperator orderWideDS = orderInfoKeyedDS
.intervalJoin(orderDetailKeyedDS)
.between(Time.milliseconds(-5), Time.milliseconds(5))
.process(
new ProcessJoinFunction() {
@Override
public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector out) throws Exception {
// 生成订单宽表
out.collect(new OrderWide(orderInfo, orderDetail));
}
}
);
orderWideDS.print("orderWide>>>>");
打印日志
orderWide>>>>:2> OrderWide(detail_id=116020, order_id=38142, sku_id=33, order_price=488.00, sku_num=3, sku_name=香奈儿(Chanel)女士香水5号香水 粉邂逅柔情淡香水EDT 粉邂逅淡香水35ml, province_id=21, order_status=1001, user_id=8031, total_amount=47065.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=47052.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=1464.00, expire_time=null, create_time=2021-03-21 16:42:31, operate_time=null, create_date=null, create_hour=null, province_name=null, province_area_code=null, province_iso_code=null, province_3166_2_code=null, user_age=null, user_gender=null, spu_id=null, tm_id=null, category3_id=null, spu_name=null, tm_name=null, category3_name=null)
orderWide>>>>:2> OrderWide(detail_id=116021, order_id=38142, sku_id=12, order_price=9197.00, sku_num=3, sku_name=Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机, province_id=21, order_status=1001, user_id=8031, total_amount=47065.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=47052.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=27591.00, expire_time=null, create_time=2021-03-21 16:42:31, operate_time=null, create_date=null, create_hour=null, province_name=null, province_area_code=null, province_iso_code=null, province_3166_2_code=null, user_age=null, user_gender=null, spu_id=null, tm_id=null, category3_id=null, spu_name=null, tm_name=null, category3_name=null)
参考: https://ci.apache.org/projects/flink/flink-docs-release1.12/dev/stream/operators/joining.html#interval-join