学习目标
- 能够搭建flink实时ETL项目开发环境
- 能够针对etl的业务处理进行封装公共接口
# canal配置
canal.server.ip=node1
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=chb_shop.*
# zookeeper配置
zookeeper.server.ip=node1:2181,node2:2181,node3:2181
# kafka配置
kafka.bootstrap_servers_config=node1:9092,node2:9092,node3:9092
kafka.batch_size_config=1024
kafka.acks=all
kafka.retries=0
kafka.client_id_config=chb_shop_canal_click
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
kafka.value_serializer_class_config=cn.chb.canal.protobuf.ProtoBufSerializer
kafka.topic=ods_chb_shop_mysql
编写读取配置文件工具类
/**
* 读取 config.properties配置文件
*/
public class ConfigUtil {
private static Properties properties;
static {
try {
properties = new Properties();
properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static String canalServerIp() {
return properties.getProperty("canal.server.ip");
}
public static int canalServerPort() {
return Integer.parseInt(properties.getProperty("canal.server.port"));
}
public static String canalServerDestination() {
return properties.getProperty("canal.server.destination");
}
public static String canalServerUsername() {
return properties.getProperty("canal.server.username");
}
public static String canalServerPassword() {
return properties.getProperty("canal.server.password");
}
public static String canalSubscribeFilter() {
return properties.getProperty("canal.subscribe.filter");
}
public static String zookeeperServerIp() {
return properties.getProperty("zookeeper.server.ip");
}
public static String kafkaBootstrap_servers_config() {
return properties.getProperty("kafka.bootstrap_servers_config");
}
public static String kafkaBatch_size_config() {
return properties.getProperty("kafka.batch_size_config");
}
public static String kafkaAcks() {
return properties.getProperty("kafka.acks");
}
public static String kafkaRetries() {
return properties.getProperty("kafka.retries");
}
public static String kafkaBatch() {
return properties.getProperty("kafka.batch");
}
public static String kafkaClient_id_config() {
return properties.getProperty("kafka.client_id_config");
}
public static String kafkaKey_serializer_class_config() {
return properties.getProperty("kafka.key_serializer_class_config");
}
public static String kafkaValue_serializer_class_config() {
return properties.getProperty("kafka.value_serializer_class_config");
}
public static String kafkaTopic() {
return properties.getProperty("kafka.topic");
}
public static void main(String[] args) {
System.out.println(canalServerIp());
System.out.println(canalServerPort());
System.out.println(canalServerDestination());
System.out.println(canalServerUsername());
System.out.println(canalServerPassword());
}
}
编写CanalClient客户端核心实现类
/**
* Canal客户端
*/
public class CanalClient {
// 一次性读取BINLOG数据条数
private static final int BATCH_SIZE = 5 * 1024;
// Canal客户端连接器
private CanalConnector canalConnector;
// Canal配置项
private Properties properties;
private KafkaSender kafkaSender;
public CanalClient() {
// 初始化连接
canalConnector = CanalConnectors.newClusterConnector(ConfigUtil.zookeeperServerIp(),
ConfigUtil.canalServerDestination(),
ConfigUtil.canalServerIp(),
ConfigUtil.canalServerPassword());
kafkaSender = new KafkaSender();
}
// 开始监听
public void start() {
try {
while(true) {
// 建立连接
canalConnector.connect();
// 回滚上次的get请求,重新获取数据
canalConnector.rollback();
// 订阅匹配日志
canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());
while(true) {
// 批量拉取binlog日志,一次性获取多条数据
Message message = canalConnector.getWithoutAck(BATCH_SIZE);
// 获取batchId
long batchId = message.getId();
// 获取binlog数据的条数
int size = message.getEntries().size();
if(batchId == -1 || size == 0) {
}
else {
Map binlogMsgMap = binlogMessageToMap(message);
RowData rowData = new RowData(binlogMsgMap);
System.out.println(rowData.toString());
if (binlogMsgMap.size() > 0) {
kafkaSender.send(rowData);
}
}
// 确认指定的batchId已经消费成功
canalConnector.ack(batchId);
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
// 断开连接
canalConnector.disconnect();
}
}
/**
* 将binlog日志转换为Map结构
* @param message
* @return
*/
private Map binlogMessageToMap(Message message) throws InvalidProtocolBufferException {
Map rowDataMap = new HashMap();
// 1. 遍历message中的所有binlog实体
for (CanalEntry.Entry entry : message.getEntries()) {
// 只处理事务型binlog
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 获取binlog文件名
String logfileName = entry.getHeader().getLogfileName();
// 获取logfile的偏移量
long logfileOffset = entry.getHeader().getLogfileOffset();
// 获取sql语句执行时间戳
long executeTime = entry.getHeader().getExecuteTime();
// 获取数据库名
String schemaName = entry.getHeader().getSchemaName();
// 获取表名
String tableName = entry.getHeader().getTableName();
// 获取事件类型 insert/update/delete
String eventType = entry.getHeader().getEventType().toString().toLowerCase();
rowDataMap.put("logfileName", logfileName);
rowDataMap.put("logfileOffset", logfileOffset);
rowDataMap.put("executeTime", executeTime);
rowDataMap.put("schemaName", schemaName);
rowDataMap.put("tableName", tableName);
rowDataMap.put("eventType", eventType);
// 获取所有行上的变更
Map columnDataMap = new HashMap();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List columnDataList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : columnDataList) {
if(eventType.equals("insert") || eventType.equals("update")) {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
columnDataMap.put(column.getName(), column.getValue().toString());
}
}
else if(eventType.equals("delete")) {
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
columnDataMap.put(column.getName(), column.getValue().toString());
}
}
}
rowDataMap.put("columns", columnDataMap);
}
return rowDataMap;
}
}
编写Entrance入口
/**
* 入口
*/
public class Entrance {
public static void main(String[] args) {
CanalClient canalClient = new CanalClient();
canalClient.start();
}
}
使用ProtoBuf序列化binlog消息
在chb_shop_common模块创建包结构
包名说明cn.chb.canal.bean存放通用的JavaBeancn.chb.canal.protobuf实现Protobuf相关接口、实现
创建ProtoBufable接口
/**
* ProtoBuf序列化接口
* 所有能够使用ProtoBuf序列化的bean都应该实现该接口
*/
public interface ProtoBufable {
/**
* 将对象转换为字节数组
* @return 字节数组
*/
byte[] toByte();
}
创建ProtoBufSerializer序列化实现类
/**
* 用于Kafka消息序列化
*/
public class ProtoBufSerializer implements Serializer {
@Override
public void configure(Map configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, ProtoBufable data) {
return data.toByte();
}
@Override
public void close() {}
}
创建 proto 描述文件
syntax = "proto3";
option java_package = "cn.chb.canal.protobuf";
option java_outer_classname = "CanalModel";
/* 行数据 */
message RowData {
string logfileName = 15;
uint64 logfileOffset = 14;
uint64 executeTime = 1;
string schemaName = 2;
string tableName = 3;
string eventType = 4;
/* 列数据 */
map columns = 5;
}
使用Maven protobuf插件编译

- RowData实体类要实现 ProtoBufable 接口
- 实现能够将toBytes方法以及使用Bytes构建RowData实体类
public class RowData implements ProtoBufable {
private String logfilename;
private Long logfileoffset;
private Long executeTime;
private String schemaName;
private String tableName;
private String eventType;
private Map columns;
public RowData() {
}
public RowData(Map map) {
this.logfilename = map.get("logfileName").toString();
this.logfileoffset = Long.parseLong(map.get("logfileOffset").toString());
this.executeTime = Long.parseLong(map.get("executeTime").toString());
this.schemaName = map.get("schemaName").toString();
this.tableName = map.get("tableName").toString();
this.eventType = map.get("eventType").toString();
this.columns = (Map)map.get("columns");
}
public RowData(String logfilename,
Long logfileoffset,
Long executeTime,
String schemaName,
String tableName,
String eventType,
Map columns) {
this.logfilename = logfilename;
this.logfileoffset = logfileoffset;
this.executeTime = executeTime;
this.schemaName = schemaName;
this.tableName = tableName;
this.eventType = eventType;
this.columns = columns;
}
public RowData(byte[] bytes) {
try {
CanalModel.RowData rowData = CanalModel.RowData.parseFrom(bytes);
this.logfilename = rowData.getLogfileName();
this.logfileoffset = rowData.getLogfileOffset();
this.executeTime = rowData.getExecuteTime();
this.tableName = rowData.getTableName();
this.eventType = rowData.getEventType();
// 将所有map列值添加到可变HashMap中
this.columns = new HashMap();
columns.putAll(rowData.getColumnsMap());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public byte[] toByte() {
CanalModel.RowData.Builder builder = CanalModel.RowData.newBuilder();
builder.setLogfileName(this.logfilename);
builder.setLogfileOffset(this.logfileoffset);
builder.setExecuteTime(this.executeTime);
builder.setTableName(this.tableName);
builder.setEventType(this.eventType);
for (String key : this.columns.keySet()) {
builder.putColumns(key, this.columns.get(key));
}
return builder.build().toByteArray();
}
## 生成getter/setter方法
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
CanalClient转换消息为RowData实体类,并打印
找到CanalClient.java的start方法
RowData rowData = new RowData(binlogMsgMap);
System.out.println(rowData.toString());
运行测试
生产ProtoBuf消息到Kafka中 实现KafkaSender该类用于生成数据到Kafka
/**
* Kafka生产者
*/
public class KafkaSender {
private Properties kafkaProps = new Properties();
private KafkaProducer kafkaProducer;
public KafkaSender() {
kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
kafkaProps.put("acks", ConfigUtil.kafkaAcks());
kafkaProps.put("retries", ConfigUtil.kafkaRetries());
kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());
kafkaProducer = new KafkaProducer(kafkaProps);
}
public void send(RowData rowData) {
kafkaProducer.send(new ProducerRecord(ConfigUtil.kafkaTopic(), null, rowData));
}
}
CanalClient调用KafkaSender生产数据
public class CanalClient {
// ...
private KafkaSender kafkaSender;
// 开始监听
public void start() {
...
RowData rowData = new RowData(binlogMsgMap);
kafkaSender.send(rowData);
}
创建Kafka topic并执行测试
# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_mysql --replication-factor 3 --partitions 3
# 创建控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_chb_shop_mysql --from-beginning
Flink实时ETL项目初始化
配置文件
resources
application.conf
log4j.properties
hbase-site.xml
创建包结构
在scala目录中创建以下包结构:
包名说明cn.chb.shop.realtime.etl.app程序入口cn.chb.shop.realtime.etl.async异步IO相关cn.chb.shop.realtime.etl.bean实体类cn.chb.shop.realtime.etl.utils工具类cn.chb.shop.realtime.etl.process实时ETL处理cn.chb.shop.realtime.etl.dataloader维度数据离线同步 编写工具类加载配置文件- 在 util 包下创建 GlobalConfigUtil 单例对象
- 编写代码
- 使用 ConfigFactory.load 获取配置对象
- 调用config.getString方法加载 application.conf 配置
- 添加一个main方法测试,工具类是否能够正确读取出配置项
RedisUtil.scala
Hbase连接池工具类HbaseUtil.scala
初始化Flink流式计算程序- 创建App单例对象,初始化Flink运行环境
- 创建main方法
- 编写代码
- 获取StreamExecutionEnvironment运行环境
- 将Flink默认的开发环境并行度设置为1
- 开启checkpoint
- 编写测试代码,测试Flink程序是否能够正确执行
参考代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object App {
def main(args: Array[String]): Unit = {
// 一、初始化Flink运行环境
// 1. 获取StreamExecutionEnvironment运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 将Flink默认的开发环境并行度设置为1
env.setParallelism(1)
// 3. 配置Checkpoint
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// checkpoint的HDFS保存位置
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/checkpoint/"))
// 配置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// 配置最大checkpoint的并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 配置checkpoint的超时时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 当程序关闭,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000))
// 4. 编写测试代码,测试Flink程序是否能够正确执行
env.fromCollection(
List("hadoop", "hive", "spark")
).print()
env.execute("runtime_etl")
}
}
注意事项
- 一定要导入 import org.apache.flink.api.scala._ 隐式转换,否则Flink程序无法执行
该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。
在etl包下创建BaseETL特质:
/**
* 封装公共的ETL接口
*/
trait BaseETL[T] {
/**
* 构建kafka的生产者对象
* @param topic
*/
def kafkaProducer(topic:String)={
//将所有的etl后的数据写入到kafka中,写入的数据类型都是Json-》String
new FlinkKafkaProducer011[String](
topic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),//对key也进行序列化
KafkaProps.getKafkaProperties()
)
}
/**
* 从kafka中读取数据,传递返回的数据类型
* @param topic
* @return
*/
def getKafkaDataStream(topic:String):DataStream[T];
/**
* 处理数据的接口
*/
def process():Unit;
}
根据数据来源抽取etl的抽象类
对于来自于mysql的binlog日志的数据,抽取出来mysql的基类
/**
* 编写mysql数据处理的基类,该类中处理的数据是RowData类型的数据
*/
abstract class MysqlBaseETL(env:StreamExecutionEnvironment) extends BaseETL[RowData] {
/**
* 从kafka中读取数据,传递返回的数据类型
* @param topic
* @return
*/
override def getKafkaDataStream(topic: String = "ods_chb_shop_mysql"): DataStream[RowData] = {
//现在消费的是kafka中的binlog数据,而在canalclient写入到kafka的数据是:RowData
val canalKafkaConsumer: FlinkKafkaConsumer011[RowData] = new FlinkKafkaConsumer011[RowData](
topic,
//new SimpleStringSchema(),不可以这样写,因为现在kafka存储的是RowData对象,
//而这个对象是我们自己定义的,所以说我们需要自己写一个反序列化类
new CanalRowDataDeserizationSchema(),
//kafka的properties对象
KafkaProps.getKafkaProperties()
)
//将消费者添加到env环境中
val canalRowDataDS: DataStream[RowData] = env.addSource(canalKafkaConsumer)
//将获取到的数据返回
canalRowDataDS
}
}
对于日志数据,封装来自消息队列的基类
/**
* 编写点击流日志、评论、购物车等等数据的ETL处理基类,需要继承自BaseETL
*/
abstract class MQBaseETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
/**
* 从kafka中读取数据,传递返回的数据类型
*
* @param topic
* @return
*/
override def getKafkaDataStream(topic: String): DataStream[String] = {
//编写kafka消费者对象实例
val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](
topic,
new SimpleStringSchema(),
KafkaProps.getKafkaProperties()
)
//将消费者对象实例添加到数据源
val logDataStream: DataStream[String] = env.addSource(kafkaConsumer)
logDataStream
}
}