您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 3浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

实时数仓(二)

宝哥大数据 发布时间:2021-02-04 08:46:33 ,浏览量:3

学习目标

  • 能够搭建flink实时ETL项目开发环境
  • 能够针对etl的业务处理进行封装公共接口
项目开发 搭建工程 groupidartifact模块cn.chbshop_parent父工程cn.chbshop_common公共模块cn.chbshop_canal_clientcanal客户端采集模块cn.chbshop_realtime_etlFlink实时计算模块cn.chbshop_data_simulator数据模拟器模块cn.chbflinkcepflinkcep模块cn.chbshop_realtime_riskctlFlink实时风控计算模块 导入依赖 具体见项目源码 开发Canal客户端订阅binlog消息 在canal_client模块创建包结构 包名说明com.chb.canal_client存放入口、Canal客户端核心实现com.chb.canal_client.util存放工具类com.chb.canal_client.kafka存放Kafka生产者实现 添加配置文件config.properties
# canal配置
canal.server.ip=node1
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=chb_shop.*

# zookeeper配置
zookeeper.server.ip=node1:2181,node2:2181,node3:2181

# kafka配置
kafka.bootstrap_servers_config=node1:9092,node2:9092,node3:9092
kafka.batch_size_config=1024
kafka.acks=all
kafka.retries=0
kafka.client_id_config=chb_shop_canal_click
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
kafka.value_serializer_class_config=cn.chb.canal.protobuf.ProtoBufSerializer
kafka.topic=ods_chb_shop_mysql
编写读取配置文件工具类
/**
 * 读取 config.properties配置文件
 */
public class ConfigUtil {
    private static Properties properties;

    static {
        try {
            properties = new Properties();
            properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static String canalServerIp() {
        return properties.getProperty("canal.server.ip");
    }

    public static int canalServerPort() {
        return Integer.parseInt(properties.getProperty("canal.server.port"));
    }

    public static String canalServerDestination() {
        return properties.getProperty("canal.server.destination");
    }

    public static String canalServerUsername() {
        return properties.getProperty("canal.server.username");
    }

    public static String canalServerPassword() {
        return properties.getProperty("canal.server.password");
    }

    public static String canalSubscribeFilter() {
        return properties.getProperty("canal.subscribe.filter");
    }

    public static String zookeeperServerIp() {
        return properties.getProperty("zookeeper.server.ip");
    }

    public static String kafkaBootstrap_servers_config() {
        return properties.getProperty("kafka.bootstrap_servers_config");
    }

    public static String kafkaBatch_size_config() {
        return properties.getProperty("kafka.batch_size_config");
    }

    public static String kafkaAcks() {
        return properties.getProperty("kafka.acks");
    }

    public static String kafkaRetries() {
        return properties.getProperty("kafka.retries");
    }

    public static String kafkaBatch() {
        return properties.getProperty("kafka.batch");
    }

    public static String kafkaClient_id_config() {
        return properties.getProperty("kafka.client_id_config");
    }

    public static String kafkaKey_serializer_class_config() {
        return properties.getProperty("kafka.key_serializer_class_config");
    }

    public static String kafkaValue_serializer_class_config() {
        return properties.getProperty("kafka.value_serializer_class_config");
    }

    public static String kafkaTopic() {
        return properties.getProperty("kafka.topic");
    }

    public static void main(String[] args) {
        System.out.println(canalServerIp());
        System.out.println(canalServerPort());
        System.out.println(canalServerDestination());
        System.out.println(canalServerUsername());
        System.out.println(canalServerPassword());
    }
}
编写CanalClient客户端核心实现类
/**
 * Canal客户端
 */
public class CanalClient {

    // 一次性读取BINLOG数据条数
    private static final int BATCH_SIZE = 5 * 1024;
    // Canal客户端连接器
    private CanalConnector canalConnector;
    // Canal配置项
    private Properties properties;
    private KafkaSender kafkaSender;

    public CanalClient() {
        // 初始化连接
        canalConnector = CanalConnectors.newClusterConnector(ConfigUtil.zookeeperServerIp(),
                ConfigUtil.canalServerDestination(),
                ConfigUtil.canalServerIp(),
                ConfigUtil.canalServerPassword());

        kafkaSender = new KafkaSender();
    }

    // 开始监听
    public void start() {
        try {
            while(true) {
                // 建立连接
                canalConnector.connect();
                // 回滚上次的get请求,重新获取数据
                canalConnector.rollback();
                // 订阅匹配日志
                canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());
                while(true) {
                    // 批量拉取binlog日志,一次性获取多条数据
                    Message message = canalConnector.getWithoutAck(BATCH_SIZE);
                    // 获取batchId
                    long batchId = message.getId();
                    // 获取binlog数据的条数
                    int size = message.getEntries().size();
                    if(batchId == -1 || size == 0) {

                    }
                    else {
                        Map binlogMsgMap = binlogMessageToMap(message);
                        RowData rowData = new RowData(binlogMsgMap);
                        System.out.println(rowData.toString());
                        if (binlogMsgMap.size() > 0) {
                            kafkaSender.send(rowData);
                        }
                    }
                    // 确认指定的batchId已经消费成功
                    canalConnector.ack(batchId);
                }
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            canalConnector.disconnect();
        }
    }

    /**
     * 将binlog日志转换为Map结构
     * @param message
     * @return
     */
    private Map binlogMessageToMap(Message message) throws InvalidProtocolBufferException {
        Map rowDataMap = new HashMap();

        // 1. 遍历message中的所有binlog实体
        for (CanalEntry.Entry entry : message.getEntries()) {
            // 只处理事务型binlog
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // 获取binlog文件名
            String logfileName = entry.getHeader().getLogfileName();
            // 获取logfile的偏移量
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // 获取sql语句执行时间戳
            long executeTime = entry.getHeader().getExecuteTime();
            // 获取数据库名
            String schemaName = entry.getHeader().getSchemaName();
            // 获取表名
            String tableName = entry.getHeader().getTableName();
            // 获取事件类型 insert/update/delete
            String eventType = entry.getHeader().getEventType().toString().toLowerCase();

            rowDataMap.put("logfileName", logfileName);
            rowDataMap.put("logfileOffset", logfileOffset);
            rowDataMap.put("executeTime", executeTime);
            rowDataMap.put("schemaName", schemaName);
            rowDataMap.put("tableName", tableName);
            rowDataMap.put("eventType", eventType);

            // 获取所有行上的变更
            Map columnDataMap = new HashMap();
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List columnDataList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : columnDataList) {
                if(eventType.equals("insert") || eventType.equals("update")) {
                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                        columnDataMap.put(column.getName(), column.getValue().toString());
                    }
                }
                else if(eventType.equals("delete")) {
                    for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                        columnDataMap.put(column.getName(), column.getValue().toString());
                    }
                }
            }

            rowDataMap.put("columns", columnDataMap);
        }

        return rowDataMap;
    }
}
编写Entrance入口
/**
 * 入口
 */
public class Entrance {
    public static void main(String[] args) {
        CanalClient canalClient = new CanalClient();
        canalClient.start();
    }
}
使用ProtoBuf序列化binlog消息 在chb_shop_common模块创建包结构 包名说明cn.chb.canal.bean存放通用的JavaBeancn.chb.canal.protobuf实现Protobuf相关接口、实现 创建ProtoBufable接口
/**
 * ProtoBuf序列化接口
 * 所有能够使用ProtoBuf序列化的bean都应该实现该接口
 */
public interface ProtoBufable {
    /**
     * 将对象转换为字节数组
     * @return 字节数组
     */
    byte[] toByte();
}
创建ProtoBufSerializer序列化实现类
/**
 * 用于Kafka消息序列化
 */
public class ProtoBufSerializer implements Serializer {
    @Override
    public void configure(Map configs, boolean isKey) {}

    @Override
    public byte[] serialize(String topic, ProtoBufable data) {
        return data.toByte();
    }

    @Override
    public void close() {}
}

创建 proto 描述文件
syntax = "proto3";
option java_package = "cn.chb.canal.protobuf";
option java_outer_classname = "CanalModel";

/* 行数据 */
message RowData {
    string logfileName = 15;
    uint64 logfileOffset = 14;
    uint64 executeTime = 1;
    string schemaName = 2;
    string tableName = 3;
    string eventType = 4;

    /* 列数据 */
    map columns = 5;
}

使用Maven protobuf插件编译

创建RowData实体类
  • RowData实体类要实现 ProtoBufable 接口
  • 实现能够将toBytes方法以及使用Bytes构建RowData实体类
public class RowData implements ProtoBufable {

    private String logfilename;
    private Long logfileoffset;
    private Long executeTime;
    private String schemaName;
    private String tableName;
    private String eventType;
    private Map columns;

    public RowData() {
    }

    public RowData(Map map) {
        this.logfilename = map.get("logfileName").toString();
        this.logfileoffset = Long.parseLong(map.get("logfileOffset").toString());
        this.executeTime = Long.parseLong(map.get("executeTime").toString());
        this.schemaName = map.get("schemaName").toString();
        this.tableName = map.get("tableName").toString();
        this.eventType = map.get("eventType").toString();
        this.columns = (Map)map.get("columns");
    }

    public RowData(String logfilename,
                   Long logfileoffset,
                   Long executeTime,
                   String schemaName,
                   String tableName,
                   String eventType,
                   Map columns) {
        this.logfilename = logfilename;
        this.logfileoffset = logfileoffset;
        this.executeTime = executeTime;
        this.schemaName = schemaName;
        this.tableName = tableName;
        this.eventType = eventType;
        this.columns = columns;
    }

    public RowData(byte[] bytes) {
        try {
            CanalModel.RowData rowData = CanalModel.RowData.parseFrom(bytes);
            this.logfilename = rowData.getLogfileName();
            this.logfileoffset = rowData.getLogfileOffset();
            this.executeTime = rowData.getExecuteTime();
            this.tableName = rowData.getTableName();
            this.eventType = rowData.getEventType();
            // 将所有map列值添加到可变HashMap中
            this.columns = new HashMap();
            columns.putAll(rowData.getColumnsMap());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public byte[] toByte() {
        CanalModel.RowData.Builder builder = CanalModel.RowData.newBuilder();
        builder.setLogfileName(this.logfilename);
        builder.setLogfileOffset(this.logfileoffset);
        builder.setExecuteTime(this.executeTime);
        builder.setTableName(this.tableName);
        builder.setEventType(this.eventType);
        for (String key : this.columns.keySet()) {
            builder.putColumns(key, this.columns.get(key));
        }
        return builder.build().toByteArray();
    }

	## 生成getter/setter方法

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}
CanalClient转换消息为RowData实体类,并打印

找到CanalClient.java的start方法

RowData rowData = new RowData(binlogMsgMap);
System.out.println(rowData.toString());

运行测试

生产ProtoBuf消息到Kafka中 实现KafkaSender

该类用于生成数据到Kafka

/**
 * Kafka生产者
 */
public class KafkaSender {
    private Properties kafkaProps = new Properties();
    private KafkaProducer kafkaProducer;

    public KafkaSender() {
        kafkaProps.put("bootstrap.servers", ConfigUtil.kafkaBootstrap_servers_config());
        kafkaProps.put("acks", ConfigUtil.kafkaAcks());
        kafkaProps.put("retries", ConfigUtil.kafkaRetries());
        kafkaProps.put("batch.size", ConfigUtil.kafkaBatch_size_config());
        kafkaProps.put("key.serializer", ConfigUtil.kafkaKey_serializer_class_config());
        kafkaProps.put("value.serializer", ConfigUtil.kafkaValue_serializer_class_config());

        kafkaProducer = new KafkaProducer(kafkaProps);
    }

    public void send(RowData rowData) {
        kafkaProducer.send(new ProducerRecord(ConfigUtil.kafkaTopic(), null, rowData));
    }
}
CanalClient调用KafkaSender生产数据
public class CanalClient {
	// ...
    private KafkaSender kafkaSender;
    
        // 开始监听
    public void start() {
        	...
            RowData rowData = new RowData(binlogMsgMap);
            kafkaSender.send(rowData);
    }

创建Kafka topic并执行测试
# 创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --topic ods_chb_shop_mysql --replication-factor 3 --partitions 3
# 创建控制台消费者测试
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic ods_chb_shop_mysql --from-beginning
Flink实时ETL项目初始化 配置文件

resources
	application.conf
	log4j.properties
	hbase-site.xml
创建包结构

在scala目录中创建以下包结构:

包名说明cn.chb.shop.realtime.etl.app程序入口cn.chb.shop.realtime.etl.async异步IO相关cn.chb.shop.realtime.etl.bean实体类cn.chb.shop.realtime.etl.utils工具类cn.chb.shop.realtime.etl.process实时ETL处理cn.chb.shop.realtime.etl.dataloader维度数据离线同步 编写工具类加载配置文件
  • 在 util 包下创建 GlobalConfigUtil 单例对象
  • 编写代码
    • 使用 ConfigFactory.load 获取配置对象
    • 调用config.getString方法加载 application.conf 配置
    • 添加一个main方法测试,工具类是否能够正确读取出配置项
Redis连接池工具类

RedisUtil.scala

Hbase连接池工具类

HbaseUtil.scala

初始化Flink流式计算程序
  • 创建App单例对象,初始化Flink运行环境
  • 创建main方法
  • 编写代码
    • 获取StreamExecutionEnvironment运行环境
    • 将Flink默认的开发环境并行度设置为1
    • 开启checkpoint
    • 编写测试代码,测试Flink程序是否能够正确执行

参考代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object App {
  def main(args: Array[String]): Unit = {
    // 一、初始化Flink运行环境
    // 1. 获取StreamExecutionEnvironment运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 将Flink默认的开发环境并行度设置为1
    env.setParallelism(1)
    // 3. 配置Checkpoint
    env.enableCheckpointing(5000)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // checkpoint的HDFS保存位置
    env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink/checkpoint/"))
    // 配置两次checkpoint的最小时间间隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
    // 配置最大checkpoint的并行度
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 配置checkpoint的超时时长
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 当程序关闭,触发额外的checkpoint
  env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000))
    // 4. 编写测试代码,测试Flink程序是否能够正确执行
    env.fromCollection(
      List("hadoop", "hive", "spark")
    ).print()

    env.execute("runtime_etl")
  }
}

注意事项

  • 一定要导入 import org.apache.flink.api.scala._ 隐式转换,否则Flink程序无法执行
实时etl特质抽取 定义特质

该特质主要定义统一执行ETL处理,只有一个process方法,用于数据的接入、etl、落地。

在etl包下创建BaseETL特质:

/**
 * 封装公共的ETL接口
 */
trait BaseETL[T] {

  /**
   * 构建kafka的生产者对象
   * @param topic
   */
  def  kafkaProducer(topic:String)={
    //将所有的etl后的数据写入到kafka中,写入的数据类型都是Json-》String
    new FlinkKafkaProducer011[String](
      topic,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),//对key也进行序列化
      KafkaProps.getKafkaProperties()
    )
  }

  /**
   * 从kafka中读取数据,传递返回的数据类型
   * @param topic
   * @return
   */
  def getKafkaDataStream(topic:String):DataStream[T];

  /**
   * 处理数据的接口
   */
  def process():Unit;
}
根据数据来源抽取etl的抽象类

对于来自于mysql的binlog日志的数据,抽取出来mysql的基类

/**
 * 编写mysql数据处理的基类,该类中处理的数据是RowData类型的数据
 */
abstract class MysqlBaseETL(env:StreamExecutionEnvironment) extends BaseETL[RowData] {
  /**
   * 从kafka中读取数据,传递返回的数据类型
   * @param topic
   * @return
   */
  override def getKafkaDataStream(topic: String = "ods_chb_shop_mysql"): DataStream[RowData] = {
    //现在消费的是kafka中的binlog数据,而在canalclient写入到kafka的数据是:RowData
    val canalKafkaConsumer: FlinkKafkaConsumer011[RowData] = new FlinkKafkaConsumer011[RowData](
      topic,
      //new SimpleStringSchema(),不可以这样写,因为现在kafka存储的是RowData对象,
      //而这个对象是我们自己定义的,所以说我们需要自己写一个反序列化类
      new CanalRowDataDeserizationSchema(),
      //kafka的properties对象
      KafkaProps.getKafkaProperties()
    )

    //将消费者添加到env环境中
    val canalRowDataDS: DataStream[RowData] = env.addSource(canalKafkaConsumer)
    //将获取到的数据返回
    canalRowDataDS
  }
}

对于日志数据,封装来自消息队列的基类

/**
 * 编写点击流日志、评论、购物车等等数据的ETL处理基类,需要继承自BaseETL
 */
abstract  class MQBaseETL(env:StreamExecutionEnvironment) extends BaseETL[String] {
  /**
   * 从kafka中读取数据,传递返回的数据类型
   *
   * @param topic
   * @return
   */
  override def getKafkaDataStream(topic: String): DataStream[String] = {
    //编写kafka消费者对象实例
    val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](
      topic,
      new SimpleStringSchema(),
      KafkaProps.getKafkaProperties()
    )

    //将消费者对象实例添加到数据源
    val logDataStream: DataStream[String] = env.addSource(kafkaConsumer)
    logDataStream
  }
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.2040s