业务数据的变化,我们可以通过 Canal/maxwell 采集到,但是 Canal 是把全部数据统一写入一个 Topic 中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 Hbase,将事实数据写回 Kafka 作为业务数据的 DWD 层。
2.3.2、主要流程 2.3.2.1、 接收 Kafka 数据,过滤空值数据对 Canal 抓取数据进行 ETL,有用的部分保留,没用的过滤掉
//TODO 2.从Kafka的ODS层读取数据
String topic = "ods_base_db_m";
String groupId = "base_db_app_group";
//2.1 通过工具类获取Kafka的消费者
FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
DataStreamSource jsonStrDS = env.addSource(kafkaSource);
//TODO 3.对DS中数据进行结构的转换 String-->Json
//jsonStrDS.map(JSON::parseObject);
SingleOutputStreamOperator jsonObjDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
//jsonStrDS.print("json>>>>");
//TODO 4.对数据进行ETL 如果table为空 或者 data为空,或者长度 {
boolean flag = jsonObj.getString("table") != null
&& jsonObj.getJSONArray("data") != null
&& jsonObj.getString("data").length() > 3;
return flag;
}
);
2.3.2.2、实现动态分流功能
由于 Canal 是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。 在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为 Flink 实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢? 我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现 ➢ 一种是用 Zookeeper 存储,通过 Watch 感知数据变化。 ➢ 另一种是用 mysql 数据库存储,周期性的同步。 这里选择第二种方案,主要是 mysql 对于配置数据初始化和维护管理,用 sql 都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。 所以就有了如下图:
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.3.3.2、程序流程分析
TableProcessFunction 是一个自定义Process算子,主要包括三条时间线任务 ➢ 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是
open()
这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。 ➢ 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在 open()方法中加入 timer 实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。 ➢ 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的 tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。
/**
* Author: chb
* Date: 2021/2/1
* Desc: 配置表处理函数
*/
public class TableProcessFunction extends ProcessFunction {
//因为要将维度数据通过侧输出流输出,所以我们在这里定义一个侧输出流标记
private OutputTag outputTag;
//用于在内存中存放配置表信息的Map
private Map tableProcessMap = new HashMap();
//用于在内存中存放已经处理过的表(在phoenix中已经建过的表)
private Set existsTables = new HashSet();
//声明Phoenix的连接对象
Connection conn = null;
//实例化函数对象的时候,将侧输出流标签也进行赋值
public TableProcessFunction(OutputTag outputTag) {
this.outputTag = outputTag;
}
}
2.3.3.3.2、open()
生命周期方法,初始化连接,初始化配置表信息并开启定时任务,用于不断读取配置表信息
//在函数被调用的时候执行的方法,执行一次
@Override
public void open(Configuration parameters) throws Exception {
//初始化Phoenix连接
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(ChbConfig.PHOENIX_SERVER);
//初始化配置表信息
refreshMeta(); // 读取 MySQL 中配置表信息,存入到内存 Map 中
//开启一个定时任务
// 因为配置表的数据可能会发生变化,每隔一段时间就从配置表中查询一次数据,更新到map,并检查建表
//从现在起过delay毫秒后,每隔period执行一次
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
refreshMeta();
}
}, 5000, 5000);
}
2.3.3.3.3、processElement
核心处理方法,根据 MySQL 配置表的信息为每条数据打标签,走 kafka 还是 hbase
//每过来一个元素,方法执行一次,主要任务是根据内存中配置表Map对当前进来的元素进行分流处理
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector out) throws Exception {
//获取表名
String table = jsonObj.getString("table");
//获取操作类型
String type = jsonObj.getString("type");
//注意:问题修复 如果使用Maxwell的Bootstrap同步历史数据 ,这个时候它的操作类型叫bootstrap-insert
if ("bootstrap-insert".equals(type)) {
type = "insert";
jsonObj.put("type", type);
}
if (tableProcessMap != null && tableProcessMap.size() > 0) {
//根据表名和操作类型拼接key
String key = table + ":" + type;
//从内存的配置Map中获取当前key对象的配置信息
TableProcess tableProcess = tableProcessMap.get(key);
//如果获取到了该元素对应的配置信息
if (tableProcess != null) {
//获取sinkTable,指明当前这条数据应该发往何处 如果是维度数据,那么对应的是phoenix中的表名;如果是事实数据,对应的是kafka的主题名
jsonObj.put("sink_table", tableProcess.getSinkTable());
String sinkColumns = tableProcess.getSinkColumns();
//如果指定了sinkColumn,需要对保留的字段进行过滤处理
if (sinkColumns != null && sinkColumns.length() > 0) {
filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
}
} else {
System.out.println("NO this Key:" + key + "in MySQL");
}
//根据sinkType,将数据输出到不同的流
if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)){
//如果sinkType = hbase ,说明是维度数据,通过侧输出流输出
ctx.output(outputTag,jsonObj);
}else if(tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)){
//如果sinkType = kafka ,说明是事实数据,通过主流输出
out.collect(jsonObj);
}
}
}
2.3.3.3.4、主程序 BaseDBApp 中调用 TableProcessFunction 进行分流
//TODO 5. 动态分流 事实表放到主流,写回到kafka的DWD层;如果维度表,通过侧输出流,写入到Hbase
//5.1定义输出到Hbase的侧输出流标签
OutputTag hbaseTag = new OutputTag(TableProcess.SINK_TYPE_HBASE){};
//5.2 主流 写回到Kafka的数据
SingleOutputStreamOperator kafkaDS = filteredDS.process(
new TableProcessFunction(hbaseTag)
);
//5.3获取侧输出流 写到Hbase的数据
DataStream hbaseDS = kafkaDS.getSideOutput(hbaseTag);
kafkaDS.print("事实>>>>");
hbaseDS.print("维度>>>>");
2.3.4、把分好的流保存到对应表、主题中
2.3.4.1、维度数据保存到 Hbase 的表中
- 分流 Sink 之保存维度到 HBase(Phoenix)
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。 ◼ 一条是任务启动时执行 open 操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。 ◼ 另一条是随着每条数据的到达反复执行 invoke()(图中黑线),在这里面我们要实 现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
//TODO 7.将事实数据写回到kafka的dwd层
FlinkKafkaProducer kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(
new KafkaSerializationSchema() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord serialize(JSONObject jsonObj, @Nullable Long timestamp) {
String sinkTopic = jsonObj.getString("sink_table");
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
return new ProducerRecord(sinkTopic,dataJsonObj.toString().getBytes());
}
}
);
kafkaDS.addSink(kafkaSink);
关注我的公众号【宝哥大数据】,更多干货