- 一、简介
- 1.1.工作原理
- 1.1.1.MySQL主备复制原理
- 1.1.2.canal 工作原理
- 1.2.使用场景
- 1.2.1.抓取业务数据新增变化表
- 1.2.2.更新缓存
- 1.3.重要版本更新说明
- 1.4.多语言
- 二、安装和配置Canal
- 1.开启MySQL主从
- 1.1.开启binlog
- 1.2.设置用户权限
- 2.安装Canal
- 2.1.创建网络
- 2.2.安装Canal
- 三、监听Canal(使用第三方插件)
- 5.3.1.引入POM依赖:
- 5.3.2.编写配置:
- 5.3.3.编写Item实体类
- 5.3.4.编写Controller
- 5.3.5.编写监听器
- 5.3.6.测试
- 5.3.6.1.测试insert方法的监听
- 5.3.6.2.测试update方法的监听
- 5.3.6.3.测试delete方法的监听
GitHub的地址:https://github.com/alibaba/canal
阿里巴巴 MySQL binlog 增量订阅&消费组件
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务
cache刷新 - 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
MySQL master将数据变更写入二进制日志(binary log, 其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看)MySQL slave将master的binary log events拷贝到它的中继日志(relay log)MySQL slave重放relay log中事件,将数据变更反映它自己的数据
而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
canal模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump协议- MySQL master 收到
dump请求,开始推送binary log给 slave (即canal) canal解析binary log对象(原始为 byte 流)
1)canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
- 整体性能测试&优化,提升了150%. #726 参考: Performance
- 原生支持prometheus监控 #765 Prometheus QuickStart
- 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
- 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
- 原生支持docker镜像 #801 参考: Docker QuickStart
2)canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
1.4.多语言canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request
- canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
- canal c# 客户端: https://github.com/dotnetcore/CanalSharp
- canal go客户端: https://github.com/CanalClient/canal-go
- canal php客户端: https://github.com/xingwenge/canal-php
- canal Python客户端:https://github.com/haozi3156666/canal-python
- canal Rust客户端:https://github.com/laohanlinux/canal-rs
canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力
二、安装和配置Canal 1.开启MySQL主从Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
这里以下面的用Docker运行的mysql为例:
# 进入/tmp目录
cd /tmp
# 创建文件夹
mkdir mysql
# 进入mysql目录
cd mysql
docker run \
-p 3306:3306 \
--name mysql \
-v $PWD/conf:/etc/mysql/conf.d \
-v $PWD/logs:/logs \
-v $PWD/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=root \
--privileged \
-d \
mysql:5.7.25
1.1.开启binlog
创建mysql配置文件:
touch /tmp/mysql/conf/my.cnf
打开mysql容器挂载的文件,我的在/tmp/mysql/conf目录:
vi /tmp/mysql/conf/my.cnf
添加以下内容:
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=huangtu
配置解读:
log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=huangtu:指定对哪个database记录binary log events,这里记录huangtu这个库
接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对huangtu这个库的操作权限。
在Navicat中,输入以下命令:
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
重启·mysql·容器即可
docker restart mysql
测试设置是否成功:在mysql控制台,或者Navicat中,输入以下命令可查看:
show master status;
我们需要创建一个网络,可以将MySQL、Canal、MQ放到同一个Docker网络中:
docker network create network-huangtu
让mysql加入这个网络:
docker network connect network-huangtu mysql
2.2.安装Canal
拉取canal镜像:
docker pull canal/canal-server:v1.1.5
然后运行命令创建Canal容器,这里我设置的容器名称name为canal(下面的第二行),监听的是huangtu这个数据库下面的所有的表(下面的倒数第三行设置),大家需自行修改:
docker run -p 11111:11111 \
--name canal \
-e canal.destinations=cluster-huangtu \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=huangtu\\..* \
--network network-huangtu \
-d canal/canal-server:v1.1.5
说明:
-p 11111:11111:这是canal的默认监听端口-e canal.destinations=cluster-huangtu:canal集群的名称-e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道·mysql·容器地址,可以通过docker inspect 容器id来查看,这里是通过容器互联,直接写的容器名称进行连接-e canal.instance.dbUsername=canal:数据库用户名-e canal.instance.dbPassword=canal:数据库密码-e canal.instance.filter.regex=:要监听的表名称,这里我的huangtu\\..*表示huangtu这个库下面的所有的表--network network-huangtu:连接到·docker·的·network-huangtu·的这个网络
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
启动好之后我们可以看一下canal容器的日志:
docker logs -f canal
再进入容器内部查看日志:
进入容器
docker exec -it canal bash
查看canal的运行日志:
tail -f canal-server/logs/canal/canal.log
这里处理canal的运行日志,还有连接huangtu网络的日志:
tail -f canal-server/logs/huangtu/huangtu.log
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。当然这只是其中的一个用途,也可以用来做日志记录和其他的功能实现。
不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client
与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
5.3.1.引入POM依赖:
top.javatool
canal-spring-boot-starter
1.2.1-RELEASE
5.3.2.编写配置:
canal:
destination: huangtu # canal的集群名字,要与安装canal时设置的名称一致,即跟canal-server运行时设置的destinations一致
server: 192.168.188.128:11111 # canal服务地址
5.3.3.编写Item实体类
通过@Id、@Column、@Transient等注解完成Item与数据库表字段的映射(如果字段跟数据库名称一致,则不用更改):
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
}
5.3.4.编写Controller
import com.huangtu.item.pojo.Item;
import com.huangtu.item.service.IItemService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("item")
public class ItemController {
@Autowired
private IItemService itemService;
@PostMapping
public void saveItem(@RequestBody Item item){
itemService.saveItem(item);
}
@PutMapping
public void updateItem(@RequestBody Item item) {
itemService.updateById(item);
}
@DeleteMapping("/{id}")
public void deleteItemById(@PathVariable("id") Long id){
itemService.removeById(id);
}
}
5.3.5.编写监听器
通过实现EntryHandler接口编写监听器,监听Canal消息。注意两点:
- 实现类通过
@CanalTable("tb_item")指定监听的表信息 EntryHandler的泛型是与表对应的实体类
import com.huangtu.item.pojo.Item;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Slf4j
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler {
@Override
public void insert(Item item) {
log.info("insert = {}", item);
// 写数据到JVM进程缓存
// 写数据到redis
}
@Override
public void update(Item before, Item after) {
log.info("before = {}", before);
log.info("after = {}", after);
// 写数据到JVM进程缓存
// 写数据到redis
}
@Override
public void delete(Item item) {
log.info("delete = {}", item);
// 删除数据到JVM进程缓存
// 删除数据到redis
}
}
5.3.6.测试
给insert、update、delete三个方法打上断点,方便我们看
启动项目后会一直监听binary log文件,即mysql-bin文件:
使用postman进行接口调用,可以看到新增方法已经监听到了
使用postman进行接口调用,可以看到修改方法也已经监听到了
使用postman进行接口调用,可以看到删除方法已经监听到了
end
至此,已经完成了测试,大家可以根据这样的特性去结合自己的业务需求进行延伸。
