您当前的位置: 首页 >  flink

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink实现双流join

宝哥大数据 发布时间:2021-03-22 14:38:42 ,浏览量:1

返回上级 DWM层业务实现–订单宽表

1.3、订单和订单明细关联(双流 join)

  在 flink 中的流 join 大体分为两种,一种是基于时间窗口的 join(Time Windowed Join),比如 join、coGroup 等。另一种是基于状态缓存的 join(Temporal Table Join),比如 intervalJoin。   这里选用 intervalJoin,因为相比较窗口 join,intervalJoin 使用更简单,而且避免了应匹配的数据处于不同窗口的问题。intervalJoin 目前只有一个问题,就是还不支持 left join。   但是我们这里是订单主表与订单从表之间的关联不需要 left join,所以 intervalJoin 是较好的选择

1.3.1、指定事件时间字段
        //TODO 4. 指定事件时间字段
        //4.1 订单指定事件时间字段
        SingleOutputStreamOperator orderInfoWithTsDS = orderInfoDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner() {
                            @Override
                            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                                return orderInfo.getCreate_ts();
                            }
                        })
        );
        //4.2 订单明细指定事件时间字段
        SingleOutputStreamOperator orderDetailWithTsDS = orderDetailDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner() {
                                    @Override
                                    public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                                        return orderDetail.getCreate_ts();
                                    }
                                }
                        )
        );

注意 下面的forBoundedOutOfOrderness的前面的泛型, 指的是数据流中的数据类型 在这里插入图片描述

1.3.2、按照订单id进行分组 指定关联的key
        //TODO 5.按照订单id进行分组  指定关联的key
        KeyedStream orderInfoKeyedDS = orderInfoWithTsDS.keyBy(OrderInfo::getId);
        KeyedStream orderDetailKeyedDS = orderDetailWithTsDS.keyBy(OrderDetail::getOrder_id);

1.3.3、订单和订单明细关联 intervalJoin
        //TODO 6.使用intervalJoin对订单和订单明细进行关联
        SingleOutputStreamOperator orderWideDS = orderInfoKeyedDS
                .intervalJoin(orderDetailKeyedDS)
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(
                        new ProcessJoinFunction() {
                            @Override
                            public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector out) throws Exception {
                                // 生成订单宽表
                                out.collect(new OrderWide(orderInfo, orderDetail));
                            }
                        }
                );

        orderWideDS.print("orderWide>>>>");

打印日志

orderWide>>>>:2> OrderWide(detail_id=116020, order_id=38142, sku_id=33, order_price=488.00, sku_num=3, sku_name=香奈儿(Chanel)女士香水5号香水 粉邂逅柔情淡香水EDT 粉邂逅淡香水35ml, province_id=21, order_status=1001, user_id=8031, total_amount=47065.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=47052.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=1464.00, expire_time=null, create_time=2021-03-21 16:42:31, operate_time=null, create_date=null, create_hour=null, province_name=null, province_area_code=null, province_iso_code=null, province_3166_2_code=null, user_age=null, user_gender=null, spu_id=null, tm_id=null, category3_id=null, spu_name=null, tm_name=null, category3_name=null)
orderWide>>>>:2> OrderWide(detail_id=116021, order_id=38142, sku_id=12, order_price=9197.00, sku_num=3, sku_name=Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机, province_id=21, order_status=1001, user_id=8031, total_amount=47065.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=47052.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=27591.00, expire_time=null, create_time=2021-03-21 16:42:31, operate_time=null, create_date=null, create_hour=null, province_name=null, province_area_code=null, province_iso_code=null, province_3166_2_code=null, user_age=null, user_gender=null, spu_id=null, tm_id=null, category3_id=null, spu_name=null, tm_name=null, category3_name=null)

参考: https://ci.apache.org/projects/flink/flink-docs-release1.12/dev/stream/operators/joining.html#interval-join

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0432s