您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

2.3、计算业务数据 DWD 层

宝哥大数据 发布时间:2021-03-09 09:07:37 ,浏览量:0

2.3.1、准备业务数据 DWD 层

  业务数据的变化,我们可以通过 Canal/maxwell 采集到,但是 Canal 是把全部数据统一写入一个 Topic 中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 Hbase,将事实数据写回 Kafka 作为业务数据的 DWD 层。

2.3.2、主要流程 2.3.2.1、 接收 Kafka 数据,过滤空值数据

对 Canal 抓取数据进行 ETL,有用的部分保留,没用的过滤掉

        //TODO 2.从Kafka的ODS层读取数据
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";

        //2.1 通过工具类获取Kafka的消费者
        FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        DataStreamSource jsonStrDS = env.addSource(kafkaSource);

        //TODO 3.对DS中数据进行结构的转换      String-->Json
        //jsonStrDS.map(JSON::parseObject);
        SingleOutputStreamOperator jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
        //jsonStrDS.print("json>>>>");

        //TODO 4.对数据进行ETL   如果table为空 或者 data为空,或者长度 {
                boolean flag = jsonObj.getString("table") != null
                    && jsonObj.getJSONArray("data") != null
                    && jsonObj.getString("data").length() > 3;
                return flag;
            }
        );
2.3.2.2、实现动态分流功能

  由于 Canal 是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。   在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?   我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现 ➢ 一种是用 Zookeeper 存储,通过 Watch 感知数据变化。 ➢ 另一种是用 mysql 数据库存储,周期性的同步。 这里选择第二种方案,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。 所以就有了如下图: 在这里插入图片描述

2.3.3、根据 MySQL 的配置表,动态进行分流 2.3.3.1、创建配置表 table_process
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.3.3.2、程序流程分析

在这里插入图片描述 TableProcessFunction 是一个自定义Process算子,主要包括三条时间线任务 ➢ 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是 open() 这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。 ➢ 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在 open()方法中加入 timer 实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。 ➢ 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的 tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。

2.3.3.3、自定义函数 TableProcessFunction 2.3.3.3.1、基本信息定义

/**
 * Author: chb
 * Date: 2021/2/1
 * Desc:  配置表处理函数
 */
public class TableProcessFunction extends ProcessFunction {
    //因为要将维度数据通过侧输出流输出,所以我们在这里定义一个侧输出流标记
    private OutputTag outputTag;

    //用于在内存中存放配置表信息的Map 
    private Map tableProcessMap = new HashMap();

    //用于在内存中存放已经处理过的表(在phoenix中已经建过的表)
    private Set existsTables = new HashSet();

    //声明Phoenix的连接对象
    Connection conn = null;

    //实例化函数对象的时候,将侧输出流标签也进行赋值
    public TableProcessFunction(OutputTag outputTag) {
        this.outputTag = outputTag;
    }
}
2.3.3.3.2、open()

生命周期方法,初始化连接,初始化配置表信息并开启定时任务,用于不断读取配置表信息

    //在函数被调用的时候执行的方法,执行一次
    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化Phoenix连接
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(ChbConfig.PHOENIX_SERVER);

        //初始化配置表信息
        refreshMeta(); // 读取 MySQL 中配置表信息,存入到内存 Map 中


        //开启一个定时任务
        // 因为配置表的数据可能会发生变化,每隔一段时间就从配置表中查询一次数据,更新到map,并检查建表
        //从现在起过delay毫秒后,每隔period执行一次
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                refreshMeta();
            }
        }, 5000, 5000);

    }
2.3.3.3.3、processElement

核心处理方法,根据 MySQL 配置表的信息为每条数据打标签,走 kafka 还是 hbase

   //每过来一个元素,方法执行一次,主要任务是根据内存中配置表Map对当前进来的元素进行分流处理
    @Override
    public void processElement(JSONObject jsonObj, Context ctx, Collector out) throws Exception {
        //获取表名
        String table = jsonObj.getString("table");
        //获取操作类型
        String type = jsonObj.getString("type");
        //注意:问题修复  如果使用Maxwell的Bootstrap同步历史数据  ,这个时候它的操作类型叫bootstrap-insert
        if ("bootstrap-insert".equals(type)) {
            type = "insert";
            jsonObj.put("type", type);
        }

        if (tableProcessMap != null && tableProcessMap.size() > 0) {
            //根据表名和操作类型拼接key
            String key = table + ":" + type;
            //从内存的配置Map中获取当前key对象的配置信息
            TableProcess tableProcess = tableProcessMap.get(key);
            //如果获取到了该元素对应的配置信息
            if (tableProcess != null) {
                //获取sinkTable,指明当前这条数据应该发往何处  如果是维度数据,那么对应的是phoenix中的表名;如果是事实数据,对应的是kafka的主题名
                jsonObj.put("sink_table", tableProcess.getSinkTable());
                String sinkColumns = tableProcess.getSinkColumns();
                //如果指定了sinkColumn,需要对保留的字段进行过滤处理
                if (sinkColumns != null && sinkColumns.length() > 0) {
                    filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
                }
            } else {
                System.out.println("NO this Key:" + key + "in MySQL");
            }

            //根据sinkType,将数据输出到不同的流
            if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)){
                //如果sinkType = hbase ,说明是维度数据,通过侧输出流输出
                ctx.output(outputTag,jsonObj);
            }else if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)){
                //如果sinkType = kafka ,说明是事实数据,通过主流输出
                out.collect(jsonObj);
            }
        }
    }

2.3.3.3.4、主程序 BaseDBApp 中调用 TableProcessFunction 进行分流
        //TODO 5. 动态分流  事实表放到主流,写回到kafka的DWD层;如果维度表,通过侧输出流,写入到Hbase
        //5.1定义输出到Hbase的侧输出流标签
        OutputTag hbaseTag = new OutputTag(TableProcess.SINK_TYPE_HBASE){};

        //5.2 主流 写回到Kafka的数据
        SingleOutputStreamOperator kafkaDS = filteredDS.process(
            new TableProcessFunction(hbaseTag)
        );

        //5.3获取侧输出流    写到Hbase的数据
        DataStream hbaseDS = kafkaDS.getSideOutput(hbaseTag);

        kafkaDS.print("事实>>>>");
        hbaseDS.print("维度>>>>");
2.3.4、把分好的流保存到对应表、主题中 2.3.4.1、维度数据保存到 Hbase 的表中
  1. 分流 Sink 之保存维度到 HBase(Phoenix) 在这里插入图片描述 DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。 ◼ 一条是任务启动时执行 open 操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。 ◼ 另一条是随着每条数据的到达反复执行 invoke()(图中黑线),在这里面我们要实 现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
2.3.4.2、业务数据保存到 Kafka 的主题中
      //TODO 7.将事实数据写回到kafka的dwd层
        FlinkKafkaProducer kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
            new KafkaSerializationSchema() {
                @Override
                public void open(SerializationSchema.InitializationContext context) throws Exception {
                    System.out.println("kafka序列化");
                }
                @Override
                public ProducerRecord serialize(JSONObject jsonObj, @Nullable Long timestamp) {
                    String sinkTopic = jsonObj.getString("sink_table");
                    JSONObject dataJsonObj = jsonObj.getJSONObject("data");
                    return new ProducerRecord(sinkTopic,dataJsonObj.toString().getBytes());
                }
            }
        );

        kafkaDS.addSink(kafkaSink);
关注我的公众号【宝哥大数据】,更多干货

在这里插入图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0426s