您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

维表关联代码实现

宝哥大数据 发布时间:2021-03-22 21:28:14 ,浏览量:1

文章目录
    • 1.4、维表关联代码实现
      • 1.4.1、基本的维度查询功能
        • 1.4.1.1、[封装 Phoenix 查询的工具类 PhoenixUtil](https://gitee.com/chbHome/chb-realtime/blob/master/chb-realtime-parent/chb-realtime/src/main/java/com/chb/realtime/utils/PhoenixUtil.java)
        • 1.4.1.2、[封装查询维度的工具类 DimUtil(直接查询 Phoenix)](https://gitee.com/chbHome/chb-realtime/blob/master/chb-realtime-parent/chb-realtime/src/main/java/com/chb/realtime/utils/DimUtil.java)
          • 1.4.1.2.1、从Phoenix中查询数据,没有使用缓存
      • 1.4.2、优化 1:加入旁路缓存模式 (cache-aside-pattern)
        • 1.4.2.1、这种缓存策略有几个注意点
        • 1.4.2.2、缓存的选型
        • 1.4.2.3、代码实现
          • 在 DimUtil 中加入缓存,如果缓存没有再从的 Phoenix 查询
          • 在 DimUtil 中增加失效缓存的方法, 维表数据变化时要失效缓存
            • 修改 DimSink 的 invoke 方法
        • 1.4.2.4、思考:[应该先失效缓存还是先写入数据库,为什么?](https://chbxw.blog.csdn.net/article/details/115109331)
        • 1.4.2.5、测试
      • 1.4.3、优化 2:[异步查询](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html#collapse-21)
        • 1.4.3.1、先决条件
        • 1.4.3.2、异步 I/O API
          • 1.4.3.2.1、创建线程池工具类`com.chb.realtime.utils.ThreadPoolUtil`
          • 1.4.3.2.2、自定义维度查询接口
          • 1.4.3.2.3、封装维度异步查询的函数类 DimAsyncFunction
          • 1.4.3.2.4、如何使用这个 DimAsyncFunction
            • 1.4.3.2.4.1、测试用户维度关联
            • 1.4.3.2.4.2、测试省市维度关联
            • 1.4.3.2.4.3、测试SKU维度关联
            • 1.4.3.2.4.4、测试SPU维度关联
            • 1.4.3.2.4.5、测试品类维度关联
            • 1.4.3.2.4.6、测试品牌维度关联
      • 1.4.4、将关联后的订单宽表数据写回到kafka的**DWM**层

1.4、维表关联代码实现

维度关联实际上就是在流中查询存储在 hbase 中的数据表。但是即使通过主键的方式查询,hbase 速度的查询也是不及流之间的 join。外部数据源的查询常常是流式计算的性能瓶颈,所以咱们再这个基础上还有进行一定的优化。

1.4.1、基本的维度查询功能 1.4.1.1、封装 Phoenix 查询的工具类 PhoenixUtil

在这里插入图片描述

通过反射创建结果集 在这里插入图片描述

1.4.1.2、封装查询维度的工具类 DimUtil(直接查询 Phoenix) 1.4.1.2.1、从Phoenix中查询数据,没有使用缓存
    /**
     * 从Phoenix中查询数据,没有使用缓存
     * select * from DIM_PERSON where id=10 and name=zs;
     * @param tableName         表名
     * @param cloNameAndValue   查询哪些字段,条件s
     * @return
     */
    public static JSONObject getDimInfoNoCache(String tableName, Tuple2... cloNameAndValue) {
        //拼接查询条件
        String whereSql = " where 1=1 "; // 1=1 是为了后面拼接条件方便
        for (int i = 0; i  0) {
            dimJsonObj = dimList.get(0);
        } else {
            System.out.println("维度数据没有找到:" + sql);
            System.out.println();
        }
        return dimJsonObj;
    }
1.4.2、优化 1:加入旁路缓存模式 (cache-aside-pattern)

  在上面实现的功能中,直接查询的 Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以我们需要在上面实现的基础上进行一定的优化。我们这里使用旁路缓存。   旁路缓存模式是一种非常常见的按需分配缓存的模式。如下图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,查询数据库,同时把结果写入缓存以备后续请求使用。 在这里插入图片描述

1.4.2.1、这种缓存策略有几个注意点

缓存要设过期时间,不然冷数据会常驻缓存浪费资源。 要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。

1.4.2.2、缓存的选型

一般两种:堆缓存或者独立缓存服务(redis,memcache)

  • 堆缓存,从性能角度看更好,毕竟访问数据路径更短,减少过程消耗。但是管理性差,其他进程无法维护缓存中的数据。
  • 独立缓存服务(redis,memcache)本事性能也不错,不过会有创建连接、网络 IO 等消耗。但是考虑到数据如果会发生变化,那还是独立缓存服务管理性更强,而且如果数据量特别大,独立缓存更容易扩展。

因为咱们的维度数据都是可变数据,所以这里还是采用 Redis 管理缓存。

1.4.2.3、代码实现 在 DimUtil 中加入缓存,如果缓存没有再从的 Phoenix 查询
  //在做维度关联的时候,大部分场景都是通过id进行关联,所以提供一个方法,只需要将id的值作为参数传进来即可
    public static JSONObject getDimInfo(String tableName, String id) {
        return getDimInfo(tableName, Tuple2.of("id", id));
    }

    /**
     * 优化:从Phoenix中查询数据,加入了旁路缓存
            先从缓存查询,如果缓存没有查到数据,再到Phoenix查询,并将查询结果放到缓存中
    

redis 类型: string Key: dim:表名:值 例如:dim:DIM_PERSON:1_xxx 1_xxx对应where字段 value: 通过PhoenixUtil到维度表中查询数据,取出第一条并将其转换为json字符串 失效时间: 24*3600 //"DIM_PERSON", Tuple2.of("id", "1"),Tuple2.of("name","zhangsan")) redisKey= "dim:DIM_PERSON:" where id='2' and name='chb' dim:DIM_PERSON:2_chb ----->Json dim:DIM_PERSON:2_chb

*/ public static JSONObject getDimInfo(String tableName, Tuple2... cloNameAndValue) { //拼接查询条件 String whereSql = " where 1=1"; String redisKey = "dim:" + tableName.toLowerCase() + ":"; for (int i = 0; i 0) { redisKey += "_"; } whereSql += " and " + filedName + "='" + fieldValue + "'"; redisKey += fieldValue; } //从Redis中获取数据 Jedis jedis = null; //维度数据的json字符串形式 String dimJsonStr = null; //维度数据的json对象形式 JSONObject dimJsonObj = null; try { //获取jedis客户端 jedis = RedisUtil.getJedis(); //根据key到Redis中查询 dimJsonStr = jedis.get(redisKey); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("从redis中查询维度失败"); } //判断是否从Redis中查询到了数据 if (dimJsonStr != null && dimJsonStr.length() > 0) { dimJsonObj = JSON.parseObject(dimJsonStr); } else { //如果在Redis中没有查到数据,需要到Phoenix中查询 String sql = "select * from " + tableName + whereSql; System.out.println("查询维度的SQL:" + sql); List dimList = PhoenixUtil.queryList(sql, JSONObject.class); //对于维度查询来讲,一般都是根据主键进行查询,不可能返回多条记录,只会有一条 if (dimList != null && dimList.size() > 0) { dimJsonObj = dimList.get(0); //将查询出来的数据放到Redis中缓存起来 if (jedis != null) { jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString()); } } else { System.out.println("维度数据没有找到:" + sql); } } //关闭Jedis if (jedis != null) { jedis.close(); } return dimJsonObj; }
在 DimUtil 中增加失效缓存的方法, 维表数据变化时要失效缓存
    /**
     * 根据key让Redis中的缓存失效
     * @param tableName
     * @param id
     */
    public static void deleteCached(String tableName, String id) {
        String key = "dim:" + tableName.toLowerCase() + ":" + id;
        try {
            Jedis jedis = RedisUtil.getJedis();
            // 通过key清除缓存
            jedis.del(key);
            jedis.close();
        } catch (Exception e) {
            System.out.println("缓存异常!");
            e.printStackTrace();
        }
    }
修改 DimSink 的 invoke 方法

如果维度数据发生了变化,同时失效该数据对应的 Redis 中的缓存

    /**
     * 对流中的数据进行处理
     *
     * @param jsonObj
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
        //获取目标表的名称
        String tableName = jsonObj.getString("sink_table");
        //获取json中data数据   data数据就是经过过滤之后  保留的业务表中字段
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");

        if (dataJsonObj != null && dataJsonObj.size() > 0) {
            //根据data中属性名和属性值  生成upsert语句
            String upsertSql = genUpsertSql(tableName.toUpperCase(), dataJsonObj);
            System.out.println("向Phoenix插入数据的SQL:" + upsertSql);

            //执行SQL
            PreparedStatement ps = null;
            try {
                ps = conn.prepareStatement(upsertSql);
                ps.execute();
                //注意:执行完Phoenix插入操作之后,需要手动提交事务
                conn.commit();
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException("向Phoenix插入数据失败");
            } finally {
                if (ps != null) {
                    ps.close();
                }
            }

            //如果当前做的是更新操作,需要将Redis中缓存的数据清除掉
            if(jsonObj.getString("type").equals("update")){
                DimUtil.deleteCached(tableName,dataJsonObj.getString("id"));
            }
        }
    }
1.4.2.4、思考:应该先失效缓存还是先写入数据库,为什么?

先写入数据库,再失效缓存

1.4.2.5、测试

➢ 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis ➢ 确定在 Redis 中存在某一个维度数据的缓存,如果没有运行 DimUtil 的 main 方法生成 ➢ 运行 Idea 中的 BaseDBApp ➢ 修改数据库 chb_realtime 中的维度表和 Redis 缓存对应的数据,该数据会通过 Maxwell 同步到 Kafka,然后 BaseDBApp 同步到 Hbase 的维度表中 ➢ 查看 Redis 中的缓存是否被删除了

1.4.3、优化 2:异步查询

  在 Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。例如:在电商场景中,需要一个商品的 skuid 去关联商品的一些属性,例如商品所属行业、商品的生产厂家、生产厂家的一些情况;在物流场景中,知道包裹 id,需要去关联包裹的行业属性、发货信息、收货信息等等。

  默认情况下,在 Flink 的 MapFunction 中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO 阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加 MapFunction的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

  Flink 在 1.2 中引入了 Async I/O,在异步模式下,将 IO 操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。   Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

在这里插入图片描述

  异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。

  这种方式特别针对涉及网络 IO 的操作,减少因为请求等待带来的消耗。

1.4.3.1、先决条件

如上节所述,正确地实现数据库(或键/值存储)的异步 I/O 交互需要支持异步请求的数据库客户端。许多主流数据库都提供了这样的客户端。

如果没有这样的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端。然而,这种方法通常比正规的异步客户端效率低。

1.4.3.2、异步 I/O API

Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。

在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:

  • 实现分发请求的 AsyncFunction
  • 获取数据库交互的结果并发送给 ResultFuture 的 回调 函数
  • 将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作。
1.4.3.2.1、创建线程池工具类com.chb.realtime.utils.ThreadPoolUtil 1.4.3.2.2、自定义维度查询接口

这个异步维表查询的方法适用于各种维表的查询,用什么条件查,查出来的结果如何合并到数据流对象中,需要使用者自己定义。 这就是自己定义了一个接口 DimJoinFunction包括两个方法。

public interface DimJoinFunction {
    //需要提供一个获取key的方法,但是这个方法如何实现不知道
    String getKey(T obj);

    //流中的事实数据和查询出来的维度数据进行关联
    void join(T obj, JSONObject dimInfoJsonObj) throws Exception;
}

1.4.3.2.3、封装维度异步查询的函数类 DimAsyncFunction

该类继承异步方法类 RichAsyncFunction,实现自定义维度查询接口 其中 RichAsyncFunction 是 Flink 提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是。 RichAsyncFunction 这个类要实现两个方法:

  • open 用于初始化异步连接池。
  • asyncInvoke 方法是核心方法,里面的操作必须是异步的,如果你查询的数据库有异步 api 也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查询。 在这里插入图片描述
1.4.3.2.4、如何使用这个 DimAsyncFunction

核心的类是 AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。 ➢ 无序等待(unorderedWait) 后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。 ➢ 有序等待(orderedWait) 严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些。 ➢ 注意 ◼ 这里实现了用户维表的查询,那么必须重写装配结果 join 方法和获取查询 rowkey的 getKey 方法。 ◼ 方法的最后两个参数 10, TimeUnit.SECONDS ,标识次异步查询最多执行 10 秒,否则会报超时异常。

        //TODO 7.关联用户维度
        SingleOutputStreamOperator orderWideWithUserDS = AsyncDataStream.unorderedWait(
                orderWideDS,
                new DimAsyncFunction("DIM_USER_INFO") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return orderWide.getUser_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws Exception {
                        // 转换维度数据,此处省略

                        //将维度中的年龄赋值给订单宽表中的属性
                        orderWide.setUser_age(age);

                        //将维度中的性别赋值给订单宽表中的属性
                        orderWide.setUser_gender(dimInfoJsonObj.getString("GENDER"));
                    }
                },
                60, TimeUnit.SECONDS);
1.4.3.2.4.1、测试用户维度关联

◼ 将 table_process 表中的数据删除掉,执行 table_process 初始配置.sql ◼ 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis ◼ 运行运行 Idea 中的 BaseDBApp ◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table user_info 

◼ 运行 Idea 中的 OrderWideApp ◼ 执行模拟生成业务数据的 jar 包 ◼ 查看控制台输出可以看到用户的年龄以及性别 在这里插入图片描述

1.4.3.2.4.2、测试省市维度关联

◼ 在table_process 中添加

在这里插入图片描述 ◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_province
1.4.3.2.4.3、测试SKU维度关联

◼ 在table_process 中添加

在这里插入图片描述

◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table sku_info
1.4.3.2.4.4、测试SPU维度关联

◼ 在table_process 中添加 在这里插入图片描述

◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table spu_info
1.4.3.2.4.5、测试品类维度关联

◼ 在table_process 中添加

在这里插入图片描述

◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category3
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category2
./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_category1
1.4.3.2.4.6、测试品牌维度关联

◼ 在table_process 中添加 在这里插入图片描述

◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap)

./bin/maxwell-bootstrap --config config.properties -database chb_realtime -table base_trademark
1.4.4、将关联后的订单宽表数据写回到kafka的DWM层
        //TODO 11.将关联后的订单宽表数据写回到kafka的DWM层
        orderWideWithTmDS.map(orderWide -> JSON.toJSONString(orderWide)).addSink(MyKafkaUtil.getKafkaSink(orderWideSinkTopic));

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0444s