订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户、地区、商品、品类、品牌等等。 为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合成为一张订单的宽表。
那究竟哪些数据需要和订单整合在一起?
如上图,由于在之前的操作我们已经把数据分拆成了事实数据和维度数据,事实数据(绿色)进入 kafka 数据流(DWD 层)中,维度数据(蓝色)进入 hbase 中长期保存。那么我们在 DWM 层中要把实时和维度数据进行整合关联在一起,形成宽表。那么这里就要处理有两种关联,事实数据和事实数据关联、事实数据和维度数据关联。 ➢ 事实数据和事实数据关联,其实就是流与流之间的关联。 ➢ 事实数据与维度数据关联,其实就是流计算中查询外部数据源。
二、逻辑实现 2.1、从 Kafka 的 dwd 层接收订单和订单明细数 //TODO 1.基本环境准备
//1.1 准备本地测试流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(4);
//1.3 设置Checkpoint
//env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(60000);
//env.setStateBackend(new FsStateBackend("hdfs://chb1:8020/chb/checkpoint/OrderWideApp"))
//TODO 2.从Kafka的DWD层读取订单和订单明细数据
//2.1 声明相关的主题以及消费者组
String orderInfoSourceTopic = "dwd_order_info";
String orderDetailSourceTopic = "dwd_order_detail";
String orderWideSinkTopic = "dwm_order_wide";
String groupId = "order_wide_group";
//2.2 读取订单主题数据
FlinkKafkaConsumer orderInfoSource = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic, groupId);
DataStreamSource orderInfoJsonStrDS = env.addSource(orderInfoSource);
//2.3 读取订单明细数据
FlinkKafkaConsumer orderDetailSource = MyKafkaUtil.getKafkaSource(orderDetailSourceTopic, groupId);
DataStreamSource orderDetailJsonStrDS = env.addSource(orderDetailSource);
//TODO 3.对读取的数据进行结构的转换 jsonString -->OrderInfo|OrderDetail
//3.1 转换订单数据结构
SingleOutputStreamOperator orderInfoDS = orderInfoJsonStrDS.map(
new RichMapFunction() {
SimpleDateFormat sdf = null;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
@Override
public OrderInfo map(String jsonStr) throws Exception {
OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());
return orderInfo;
}
}
);
//3.2 转换订单明细数据结构
SingleOutputStreamOperator orderDetailDS = orderDetailJsonStrDS.map(
new RichMapFunction() {
SimpleDateFormat sdf = null;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
@Override
public OrderDetail map(String jsonStr) throws Exception {
OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
return orderDetail;
}
}
);
//orderInfoDS.print("orderInfo>>>");
//orderDetailDS.print("orderDetail>>>");
1.2、测试数据接入
➢ 启动 Maxwell、zk、kafka、hdfs、hbase ➢ 运行 Idea 中的 BaseDBApp ➢ 运行 Idea 中的 OrderWideApp ➢ 在数据库 chb_realtime 的配置表中配置订单和订单明细 ➢ 执行 rt_dblog 下的 jar,生成模拟数据 ➢ 查看控制台输出 ➢ 执行流程 业务数据生成->Maxwell 同步->Kafka 的 ods_base_db_m 主题 -> BaseDBApp 分流写回 kafka->dwd_order_info 和 dwd_order_detail->OrderWideApp 从 kafka 的 dwd层读数据,打印输出
维度关联实际上就是在流中查询存储在 hbase 中的数据表。但是即使通过主键的方式查询,hbase 速度的查询也是不及流之间的 join。外部数据源的查询常常是流式计算的性能瓶颈,所以咱们再这个基础上还有进行一定的优化。