- 1.4、维表关联代码实现
- 1.4.1、基本的维度查询功能
- 1.4.1.1、[封装 Phoenix 查询的工具类 PhoenixUtil](https://gitee.com/chbHome/chb-realtime/blob/master/chb-realtime-parent/chb-realtime/src/main/java/com/chb/realtime/utils/PhoenixUtil.java)
- 1.4.1.2、[封装查询维度的工具类 DimUtil(直接查询 Phoenix)](https://gitee.com/chbHome/chb-realtime/blob/master/chb-realtime-parent/chb-realtime/src/main/java/com/chb/realtime/utils/DimUtil.java)
- 1.4.1.2.1、从Phoenix中查询数据,没有使用缓存
- 1.4.2、优化 1:加入旁路缓存模式 (cache-aside-pattern)
- 1.4.2.1、这种缓存策略有几个注意点
- 1.4.2.2、缓存的选型
- 1.4.2.3、代码实现
- 在 DimUtil 中加入缓存,如果缓存没有再从的 Phoenix 查询
- 在 DimUtil 中增加失效缓存的方法, 维表数据变化时要失效缓存
- 修改 DimSink 的 invoke 方法
- 1.4.2.4、思考:[应该先失效缓存还是先写入数据库,为什么?](https://chbxw.blog.csdn.net/article/details/115109331)
- 1.4.2.5、测试
- 1.4.3、优化 2:[异步查询](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html#collapse-21)
- 1.4.3.1、先决条件
- 1.4.3.2、异步 I/O API
- 1.4.3.2.1、创建线程池工具类`com.chb.realtime.utils.ThreadPoolUtil`
- 1.4.3.2.2、自定义维度查询接口
- 1.4.3.2.3、封装维度异步查询的函数类 DimAsyncFunction
- 1.4.3.2.4、如何使用这个 DimAsyncFunction
- 1.4.3.2.4.1、测试用户维度关联
- 1.4.3.2.4.2、测试省市维度关联
- 1.4.3.2.4.3、测试SKU维度关联
- 1.4.3.2.4.4、测试SPU维度关联
- 1.4.3.2.4.5、测试品类维度关联
- 1.4.3.2.4.6、测试品牌维度关联
- 1.4.4、将关联后的订单宽表数据写回到kafka的**DWM**层
维度关联实际上就是在流中查询存储在 hbase 中的数据表。但是即使通过主键的方式查询,hbase 速度的查询也是不及流之间的 join。外部数据源的查询常常是流式计算的性能瓶颈,所以咱们再这个基础上还有进行一定的优化。
1.4.1、基本的维度查询功能 1.4.1.1、封装 Phoenix 查询的工具类 PhoenixUtil通过反射创建结果集
/**
* 从Phoenix中查询数据,没有使用缓存
* select * from DIM_PERSON where id=10 and name=zs;
* @param tableName 表名
* @param cloNameAndValue 查询哪些字段,条件s
* @return
*/
public static JSONObject getDimInfoNoCache(String tableName, Tuple2... cloNameAndValue) {
//拼接查询条件
String whereSql = " where 1=1 "; // 1=1 是为了后面拼接条件方便
for (int i = 0; i 0) {
dimJsonObj = dimList.get(0);
} else {
System.out.println("维度数据没有找到:" + sql);
System.out.println();
}
return dimJsonObj;
}
1.4.2、优化 1:加入旁路缓存模式 (cache-aside-pattern)
在上面实现的功能中,直接查询的 Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以我们需要在上面实现的基础上进行一定的优化。我们这里使用旁路缓存。 旁路缓存模式是一种非常常见的按需分配缓存的模式。如下图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,查询数据库,同时把结果写入缓存以备后续请求使用。
缓存要设过期时间,不然冷数据会常驻缓存浪费资源。 要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。
1.4.2.2、缓存的选型一般两种:堆缓存或者独立缓存服务(redis,memcache)
- 堆缓存,从性能角度看更好,毕竟访问数据路径更短,减少过程消耗。但是管理性差,其他进程无法维护缓存中的数据。
- 独立缓存服务(redis,memcache)本事性能也不错,不过会有创建连接、网络 IO 等消耗。但是考虑到数据如果会发生变化,那还是独立缓存服务管理性更强,而且如果数据量特别大,独立缓存更容易扩展。
因为咱们的维度数据都是可变数据,所以这里还是采用 Redis 管理缓存。
1.4.2.3、代码实现 在 DimUtil 中加入缓存,如果缓存没有再从的 Phoenix 查询 //在做维度关联的时候,大部分场景都是通过id进行关联,所以提供一个方法,只需要将id的值作为参数传进来即可
public static JSONObject getDimInfo(String tableName, String id) {
return getDimInfo(tableName, Tuple2.of("id", id));
}
/**
* 优化:从Phoenix中查询数据,加入了旁路缓存
先从缓存查询,如果缓存没有查到数据,再到Phoenix查询,并将查询结果放到缓存中
redis
类型: string
Key: dim:表名:值 例如:dim:DIM_PERSON:1_xxx 1_xxx对应where字段
value: 通过PhoenixUtil到维度表中查询数据,取出第一条并将其转换为json字符串
失效时间: 24*3600
//"DIM_PERSON", Tuple2.of("id", "1"),Tuple2.of("name","zhangsan"))
redisKey= "dim:DIM_PERSON:"
where id='2' and name='chb'
dim:DIM_PERSON:2_chb ----->Json
dim:DIM_PERSON:2_chb
*/
public static JSONObject getDimInfo(String tableName, Tuple2... cloNameAndValue) {
//拼接查询条件
String whereSql = " where 1=1";
String redisKey = "dim:" + tableName.toLowerCase() + ":";
for (int i = 0; i 0) {
redisKey += "_";
}
whereSql += " and " + filedName + "='" + fieldValue + "'";
redisKey += fieldValue;
}
//从Redis中获取数据
Jedis jedis = null;
//维度数据的json字符串形式
String dimJsonStr = null;
//维度数据的json对象形式
JSONObject dimJsonObj = null;
try {
//获取jedis客户端
jedis = RedisUtil.getJedis();
//根据key到Redis中查询
dimJsonStr = jedis.get(redisKey);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("从redis中查询维度失败");
}
//判断是否从Redis中查询到了数据
if (dimJsonStr != null && dimJsonStr.length() > 0) {
dimJsonObj = JSON.parseObject(dimJsonStr);
} else {
//如果在Redis中没有查到数据,需要到Phoenix中查询
String sql = "select * from " + tableName + whereSql;
System.out.println("查询维度的SQL:" + sql);
List dimList = PhoenixUtil.queryList(sql, JSONObject.class);
//对于维度查询来讲,一般都是根据主键进行查询,不可能返回多条记录,只会有一条
if (dimList != null && dimList.size() > 0) {
dimJsonObj = dimList.get(0);
//将查询出来的数据放到Redis中缓存起来
if (jedis != null) {
jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());
}
} else {
System.out.println("维度数据没有找到:" + sql);
}
}
//关闭Jedis
if (jedis != null) {
jedis.close();
}
return dimJsonObj;
}
在 DimUtil 中增加失效缓存的方法, 维表数据变化时要失效缓存
/**
* 根据key让Redis中的缓存失效
* @param tableName
* @param id
*/
public static void deleteCached(String tableName, String id) {
String key = "dim:" + tableName.toLowerCase() + ":" + id;
try {
Jedis jedis = RedisUtil.getJedis();
// 通过key清除缓存
jedis.del(key);
jedis.close();
} catch (Exception e) {
System.out.println("缓存异常!");
e.printStackTrace();
}
}
修改 DimSink 的 invoke 方法
如果维度数据发生了变化,同时失效该数据对应的 Redis 中的缓存
/**
* 对流中的数据进行处理
*
* @param jsonObj
* @param context
* @throws Exception
*/
@Override
public void invoke(JSONObject jsonObj, Context context) throws Exception {
//获取目标表的名称
String tableName = jsonObj.getString("sink_table");
//获取json中data数据 data数据就是经过过滤之后 保留的业务表中字段
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
if (dataJsonObj != null && dataJsonObj.size() > 0) {
//根据data中属性名和属性值 生成upsert语句
String upsertSql = genUpsertSql(tableName.toUpperCase(), dataJsonObj);
System.out.println("向Phoenix插入数据的SQL:" + upsertSql);
//执行SQL
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(upsertSql);
ps.execute();
//注意:执行完Phoenix插入操作之后,需要手动提交事务
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("向Phoenix插入数据失败");
} finally {
if (ps != null) {
ps.close();
}
}
//如果当前做的是更新操作,需要将Redis中缓存的数据清除掉
if(jsonObj.getString("type").equals("update")){
DimUtil.deleteCached(tableName,dataJsonObj.getString("id"));
}
}
}
1.4.2.4、思考:应该先失效缓存还是先写入数据库,为什么?
先写入数据库,再失效缓存
1.4.2.5、测试➢ 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis ➢ 确定在 Redis 中存在某一个维度数据的缓存,如果没有运行 DimUtil 的 main 方法生成 ➢ 运行 Idea 中的 BaseDBApp ➢ 修改数据库 chb_realtime 中的维度表和 Redis 缓存对应的数据,该数据会通过 Maxwell 同步到 Kafka,然后 BaseDBApp 同步到 Hbase 的维度表中 ➢ 查看 Redis 中的缓存是否被删除了
1.4.3、优化 2:异步查询在 Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。例如:在电商场景中,需要一个商品的 skuid 去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹 id,需要去关联包裹的行业属性、发货信息、收货信息等等。
默认情况下,在 Flink 的 MapFunction 中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO 阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加 MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。
Flink 在 1.2 中引入了 Async I/O,在异步模式下,将 IO 操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。 Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。
这种方式特别针对涉及网络 IO 的操作,减少因为请求等待带来的消耗。
1.4.3.1、先决条件如上节所述,正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。
如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。
1.4.3.2、异步 I/O APIFlink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。
在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:
- 实现分发请求的 AsyncFunction
- 获取数据库交互的结果并发送给 ResultFuture 的 回调 函数
- 将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作。
com.chb.realtime.utils.ThreadPoolUtil
1.4.3.2.2、自定义维度查询接口
这个异步维表查询的方法适用于各种维表的查询,用什么条件查,查出来的结果如何合并到数据流对象中,需要使用者自己定义。 这就是自己定义了一个接口 DimJoinFunction包括两个方法。
public interface DimJoinFunction {
//需要提供一个获取key的方法,但是这个方法如何实现不知道
String getKey(T obj);
//流中的事实数据和查询出来的维度数据进行关联
void join(T obj, JSONObject dimInfoJsonObj) throws Exception;
}
1.4.3.2.3、封装维度异步查询的函数类 DimAsyncFunction
该类继承异步方法类 RichAsyncFunction,实现自定义维度查询接口 其中 RichAsyncFunction
是 Flink 提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是。 RichAsyncFunction 这个类要实现两个方法:
- open 用于初始化异步连接池。
- asyncInvoke 方法是核心方法,里面的操作必须是异步的,如果你查询的数据库有异步 api 也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查询。
核心的类是 AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。 ➢ 无序等待(unorderedWait) 后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。 ➢ 有序等待(orderedWait) 严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些。 ➢ 注意 ◼ 这里实现了用户维表的查询,那么必须重写装配结果 join 方法和获取查询 rowkey的 getKey 方法。 ◼ 方法的最后两个参数 10, TimeUnit.SECONDS ,标识次异步查询最多执行 10 秒,否则会报超时异常。
//TODO 7.关联用户维度
SingleOutputStreamOperator orderWideWithUserDS = AsyncDataStream.unorderedWait(
orderWideDS,
new DimAsyncFunction("DIM_USER_INFO") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getUser_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws Exception {
// 转换维度数据,此处省略
//将维度中的年龄赋值给订单宽表中的属性
orderWide.setUser_age(age);
//将维度中的性别赋值给订单宽表中的属性
orderWide.setUser_gender(dimInfoJsonObj.getString("GENDER"));
}
},
60, TimeUnit.SECONDS);
1.4.3.2.4.1、测试用户维度关联
◼ 将 table_process 表中的数据删除掉,执行 table_process 初始配置.sql ◼ 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis ◼ 运行运行 Idea 中的 BaseDBApp ◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table user_info
◼ 运行 Idea 中的 OrderWideApp ◼ 执行模拟生成业务数据的 jar 包 ◼ 查看控制台输出可以看到用户的年龄以及性别
◼ 在table_process 中添加
◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_province
1.4.3.2.4.3、测试SKU维度关联
◼ 在table_process 中添加
◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table sku_info
1.4.3.2.4.4、测试SPU维度关联
◼ 在table_process 中添加
◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table spu_info
1.4.3.2.4.5、测试品类维度关联
◼ 在table_process 中添加
◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category3
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category2
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category1
1.4.3.2.4.6、测试品牌维度关联
◼ 在table_process 中添加
◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_trademark
1.4.4、将关联后的订单宽表数据写回到kafka的DWM层
//TODO 11.将关联后的订单宽表数据写回到kafka的DWM层
orderWideWithTmDS.map(orderWide -> JSON.toJSONString(orderWide)).addSink(MyKafkaUtil.getKafkaSink(orderWideSinkTopic));