您当前的位置: 首页 >  慌途L spring

Canal | 工作原理、安装部署、使用第三方插件与SpringBoot完美整合

慌途L 发布时间:2022-07-03 16:00:00 ,浏览量:4

Canal | 工作原理、安装部署、使用第三方插件与SpringBoot完美整合
  • 一、简介
    • 1.1.工作原理
      • 1.1.1.MySQL主备复制原理
      • 1.1.2.canal 工作原理
    • 1.2.使用场景
      • 1.2.1.抓取业务数据新增变化表
      • 1.2.2.更新缓存
    • 1.3.重要版本更新说明
    • 1.4.多语言
  • 二、安装和配置Canal
    • 1.开启MySQL主从
      • 1.1.开启binlog
      • 1.2.设置用户权限
    • 2.安装Canal
      • 2.1.创建网络
      • 2.2.安装Canal
  • 三、监听Canal(使用第三方插件)
    • 5.3.1.引入POM依赖:
    • 5.3.2.编写配置:
      • 5.3.3.编写Item实体类
      • 5.3.4.编写Controller
      • 5.3.5.编写监听器
      • 5.3.6.测试
        • 5.3.6.1.测试insert方法的监听
        • 5.3.6.2.测试update方法的监听
        • 5.3.6.3.测试delete方法的监听

一、简介

GitHub的地址:https://github.com/alibaba/canal

阿里巴巴 MySQL binlog 增量订阅&消费组件

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.1.工作原理 1.1.1.MySQL主备复制原理

在这里插入图片描述

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slavemasterbinary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

在这里插入图片描述

1.1.2.canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
1.2.使用场景 1.2.1.抓取业务数据新增变化表 1.2.2.更新缓存 1.3.重要版本更新说明

1)canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance
  • 原生支持prometheus监控 #765 Prometheus QuickStart
  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
  • 原生支持docker镜像 #801 参考: Docker QuickStart

2)canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

1.4.多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal php客户端: https://github.com/xingwenge/canal-php
  • canal Python客户端:https://github.com/haozi3156666/canal-python
  • canal Rust客户端:https://github.com/laohanlinux/canal-rs

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力

二、安装和配置Canal 1.开启MySQL主从

Canal是基于MySQL主从同步功能,因此必须先开启MySQL的主从功能才可以。

这里以下面的用Docker运行的mysql为例:

# 进入/tmp目录
cd /tmp
# 创建文件夹
mkdir mysql
# 进入mysql目录
cd mysql
docker run \
 -p 3306:3306 \
 --name mysql \
 -v $PWD/conf:/etc/mysql/conf.d \
 -v $PWD/logs:/logs \
 -v $PWD/data:/var/lib/mysql \
 -e MYSQL_ROOT_PASSWORD=root \
 --privileged \
 -d \
 mysql:5.7.25
1.1.开启binlog

创建mysql配置文件:

touch /tmp/mysql/conf/my.cnf

打开mysql容器挂载的文件,我的在/tmp/mysql/conf目录:

vi /tmp/mysql/conf/my.cnf

添加以下内容:

[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=huangtu

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=huangtu:指定对哪个database记录binary log events,这里记录huangtu这个库
1.2.设置用户权限

接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对huangtu这个库的操作权限。

Navicat中,输入以下命令:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

在这里插入图片描述 在这里插入图片描述

重启·mysql·容器即可

docker restart mysql

测试设置是否成功:在mysql控制台,或者Navicat中,输入以下命令可查看:

show master status;

在这里插入图片描述

2.安装Canal 2.1.创建网络

我们需要创建一个网络,可以将MySQLCanalMQ放到同一个Docker网络中:

docker network create network-huangtu

mysql加入这个网络:

docker network connect network-huangtu mysql
2.2.安装Canal

拉取canal镜像:

docker pull canal/canal-server:v1.1.5

然后运行命令创建Canal容器,这里我设置的容器名称namecanal(下面的第二行),监听的是huangtu这个数据库下面的所有的表(下面的倒数第三行设置),大家需自行修改:

docker run -p 11111:11111 \
--name canal \
-e canal.destinations=cluster-huangtu \
-e canal.instance.master.address=mysql:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=huangtu\\..* \
--network network-huangtu \
-d canal/canal-server:v1.1.5

说明:

  • -p 11111:11111:这是canal的默认监听端口
  • -e canal.destinations=cluster-huangtucanal集群的名称
  • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道·mysql·容器地址,可以通过docker inspect 容器id来查看,这里是通过容器互联,直接写的容器名称进行连接
  • -e canal.instance.dbUsername=canal:数据库用户名
  • -e canal.instance.dbPassword=canal :数据库密码
  • -e canal.instance.filter.regex=:要监听的表名称,这里我的huangtu\\..*表示huangtu这个库下面的所有的表
  • --network network-huangtu:连接到·docker·的·network-huangtu·的这个网络

表名称监听支持的语法:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2 

启动好之后我们可以看一下canal容器的日志:

docker logs -f canal

在这里插入图片描述

再进入容器内部查看日志:

进入容器

docker exec -it canal bash

查看canal的运行日志:

tail -f canal-server/logs/canal/canal.log

在这里插入图片描述

这里处理canal的运行日志,还有连接huangtu网络的日志:

tail -f canal-server/logs/huangtu/huangtu.log

在这里插入图片描述

三、监听Canal(使用第三方插件)

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

在这里插入图片描述

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。当然这只是其中的一个用途,也可以用来做日志记录和其他的功能实现。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

5.3.1.引入POM依赖:

    top.javatool
    canal-spring-boot-starter
    1.2.1-RELEASE

5.3.2.编写配置:
canal:
  destination: huangtu # canal的集群名字,要与安装canal时设置的名称一致,即跟canal-server运行时设置的destinations一致
  server: 192.168.188.128:11111 # canal服务地址
5.3.3.编写Item实体类

通过@Id@Column@Transient等注解完成Item与数据库表字段的映射(如果字段跟数据库名称一致,则不用更改):

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
}
5.3.4.编写Controller
import com.huangtu.item.pojo.Item;
import com.huangtu.item.service.IItemService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("item")
public class ItemController {

    @Autowired
    private IItemService itemService;

    @PostMapping
    public void saveItem(@RequestBody Item item){
        itemService.saveItem(item);
    }

    @PutMapping
    public void updateItem(@RequestBody Item item) {
        itemService.updateById(item);
    }

    @DeleteMapping("/{id}")
    public void deleteItemById(@PathVariable("id") Long id){
        itemService.removeById(id);
    }
}

5.3.5.编写监听器

通过实现EntryHandler接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
import com.huangtu.item.pojo.Item;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@Slf4j
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler {

    @Override
    public void insert(Item item) {
        log.info("insert = {}", item);
        // 写数据到JVM进程缓存
        // 写数据到redis
    }

    @Override
    public void update(Item before, Item after) {
        log.info("before = {}", before);
        log.info("after = {}", after);
        // 写数据到JVM进程缓存
        // 写数据到redis
    }

    @Override
    public void delete(Item item) {
        log.info("delete = {}", item);
        // 删除数据到JVM进程缓存
        // 删除数据到redis
    }
}
5.3.6.测试

insertupdatedelete三个方法打上断点,方便我们看

在这里插入图片描述

启动项目后会一直监听binary log文件,即mysql-bin文件:

在这里插入图片描述

5.3.6.1.测试insert方法的监听

使用postman进行接口调用,可以看到新增方法已经监听到了

在这里插入图片描述

5.3.6.2.测试update方法的监听

使用postman进行接口调用,可以看到修改方法也已经监听到了

在这里插入图片描述

5.3.6.3.测试delete方法的监听

使用postman进行接口调用,可以看到删除方法已经监听到了

在这里插入图片描述

end

至此,已经完成了测试,大家可以根据这样的特性去结合自己的业务需求进行延伸。

关注
打赏
1688896170
查看更多评论

慌途L

暂无认证

  • 4浏览

    0关注

    118博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0521s