我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。
通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步
但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步
1. canal简介anal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等
canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin,同时我们还可以自定义客户端,我们讲自定义的客户端称为client
canal的数据同步流程如下图所示
canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。
canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError
1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。
这里因为我们要自定义客户端,所以只用下载服务端deployer即可
官方下载地址
当然也可以通过wget指令直接下载到服务器
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章
通过canal来实现mysql数据同步到elasticsearch
2.3 mysql配置1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog
修改mysql配置文件
vim /etc/my.cnf
修改内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
2、源数据库创建一个canal账号,并且设置slave
,dump
权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、因为mysql8.0.3后身份检验方式为caching_sha2_password
,但canal使用的是mysql_native_password
,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;
3. 实操
3.1 服务端deployer配置
1、查询源mysql服务器的binlog位置
# 源mysql服务器中登陆mysql执行
show binary logs;
2、进入deployer安装目录
cd deployer
3、我们新建一个实例es
专门用于本次演示
cd conf
# 复制example实例配置
cp -R example es
4、修改实例es配置文件instance.properties
cd es
vim instance.properties
修改内容
# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000
# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\..*
mysql数据同步起点说明:
- canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
- canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
- 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
5、启动服务端
./bin/start.sh
6、查看示例日志,无报错则说明启动成功
cat logs/es/es.log
针对服务端的详细配置项解释,可以参考官方文档:
配置项解释
1、新建一个springboot项目,我们结合之前讲解的spring-data-elasticsearch
来作为es客户端,这里就不单独说明其配置了,还不知道的同学可以参考之前的文章
从零搭建springboot整合spring data elasticsearch4.2.x环境
引入依赖spring-data-elasticsearch
、canal-spring-boot-starter
、mybatis-plus
top.javatool
canal-spring-boot-starter
1.2.1-RELEASE
org.springframework.data
spring-data-elasticsearch
4.2.10
com.baomidou
mybatis-plus-boot-starter
3.4.2
mysql
mysql-connector-java
runtime
2、修改配置文件application.yml
# 应用名称
spring:
application:
name: canal_client_es
elasticsearch:
rest:
# es 地址
uris: http://192.168.244.11:9200
username: elastic
password: elastic
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
name: defaultDataSource
url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
server:
port: 8080
# canal服务端地址
canal:
server: 192.168.244.22:11111
# 实例名,与deployer中配置的保持统一
destination: es
# 设置canal消息日志打印级别
logging:
level:
top.javatool.canal.client: warn
3、创建es客户端配置
/**
* @author benjamin
* @date 2022/10/1
*/
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.canal_client_es")
public class ElasticRestClientConfig extends AbstractElasticsearchConfiguration {
@Value("${spring.elasticsearch.rest.uris}")
private String url;
@Value("${spring.elasticsearch.rest.username}")
private String username;
@Value("${spring.elasticsearch.rest.password}")
private String password;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
url = url.replace("http://","");
String[] urlArr = url.split(",");
HttpHost[] httpPostArr = new HttpHost[urlArr.length];
for (int i = 0; i {
// 账号密码登录
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// httpclient连接数配置
httpClientBuilder.setMaxConnTotal(30);
httpClientBuilder.setMaxConnPerRoute(10);
// httpclient保活策略
httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient elasticsearchClient,ElasticsearchConverter elasticsearchConverter){
return new ElasticsearchRestTemplate(elasticsearchClient,elasticsearchConverter);
}
}
4、实现根据实体类自动创建es索引的配置类,不需要可跳过这步
@Configuration
@Slf4j
@AllArgsConstructor
public class ElasticCreateIndexStartUp implements ApplicationListener {
private final ElasticsearchRestTemplate restTemplate;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){
log.info("[elastic]索引初始化...");
Reflections f = new Reflections("com.example.canal_client_es.entity");
Set classSet = f.getTypesAnnotatedWith(Document.class);
for (Class clazz : classSet) {
IndexOperations indexOperations = restTemplate.indexOps(clazz);
if(!indexOperations.exists()){
indexOperations.create();
indexOperations.putMapping();
log.info(String.format("[elastic]索引%s数据结构创建成功",clazz.getSimpleName()));
}
}
log.info("[elastic]索引初始化完毕");
}
}
4、创建订单、商品、收货人实体,其中一个订单下有多个商品、多个收货人,我们希望同步订单表时,将商品、收货人两张表的信息同步更新。
同时因为我们需要与数据库做映射,同时也需要与es做映射,所以需要创建面向mysql和es的实体类,当然你也可以将两种整合到一起(如下所示的商品实体、收货人实体),这里为了让大家清晰的认识,我将其分开(如下所示的订单实体)
es实体类
// 订单实体
@Data
@Document(indexName = "my_order")
@Setting(replicas = 0,shards = 1)
public class Order implements Serializable {
/**
* 主键
*/
@Id
private Long id;
/**
* 订单号
*/
@Field(type = FieldType.Keyword, name="seqNo")
private String seqNo;
/**
* 总价
*/
@Field(type = FieldType.Double, name="totalPrice")
private BigDecimal totalPrice;
/**
* 数量
*/
@Field(type = FieldType.Integer, name="quantity")
private Integer quantity;
/**
* 商品清单
*/
@Field(type = FieldType.Nested, name="productList")
private List productList;
/**
* 收货人清单
*/
@Field(type = FieldType.Nested, name="userList")
private List userList;
}
// 商品实体
@Data
@Table(name = "product")
public class Product implements Serializable {
@Field(type = FieldType.Long, name="id")
private Long id;
@Field(type = FieldType.Keyword, name="seqNo")
@Column(name = "seq_no")
private String seqNo;
@Field(type = FieldType.Double, name="price")
private BigDecimal price;
@Field(type = FieldType.Text, name="name", analyzer = "ik_smart")
private String name;
}
// 收货人实体
@Data
@Table(name = "user")
public class User implements Serializable {
@Field(type = FieldType.Long, name="id")
private Long id;
@Field(type = FieldType.Keyword, name="seqNo")
@Column(name = "seq_no")
private String seqNo;
@Field(type = FieldType.Keyword, name="name")
private String name;
@Field(type = FieldType.Integer, name="age")
private Integer age;
@Field(type = FieldType.Text, name="address", analyzer = "ik_smart")
private String address;
}
数据库实体,并用jpa的注解@Column
来映射字段名。商品、收货人的数据库实体则整合到es实体中了,如上
@Data
@Table(name = "my_order")
public class OrderPO implements Serializable {
/**
* 主键
*/
@Column(name = "id")
private Long id;
/**
* 订单号
*/
@Column(name = "seq_no")
private String seqNo;
/**
* 总价
*/
@Column(name = "total_price")
private BigDecimal totalPrice;
/**
* 数量
*/
@Column(name = "quantity")
private Integer quantity;
}
5、我们基于mybatis-plus来操作数据库,因此需要创建实体的mapper、service。详细的代码大家按照mybatis-plus的用法创建即可,或者通过本文最后下载源码查看。这里不再累叙。
6、操作到这里,最好把你的项目启动一下,如果正常则继续往下操作,如果不正常也好提前排错,不要压到最后发现一堆错,也不知道错在哪里。
7、接下来我们基于canal-client提供的EntryHandler
类来实现对于数据表的监控,从而达到数据的增删改同步
@CanalTable("my_order")
@Component
@AllArgsConstructor
@Slf4j
public class OrderHandler implements EntryHandler {
private final ElasticsearchRestTemplate elasticsearchRestTemplate;
private final IProductService productService;
private final IUserService userService;
@Override
public void insert(OrderPO orderPO) {
Order order = new Order();
BeanUtils.copyProperties(orderPO,order);
List productList = productService.list(Wrappers.lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
order.setProductList(productList);
List userList = userService.list(Wrappers.lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
order.setUserList(userList);
elasticsearchRestTemplate.save(order);
}
@Override
public void update(OrderPO before, OrderPO after) {
Order order = new Order();
BeanUtils.copyProperties(after,order);
List productList = productService.list(Wrappers.lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));
order.setProductList(productList);
List userList = userService.list(Wrappers.lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));
order.setUserList(userList);
elasticsearchRestTemplate.save(order);
}
@Override
public void delete(OrderPO orderPO) {
elasticsearchRestTemplate.delete(orderPO.getId().toString(),Order.class);
}
}
3.3 测试
1、新增一条订单数据
2、kibana中查看索引数据
GET my_order/_search
结果显示新增的订单表同步成功,并且两张子表的数据也成功同步了。
3、再修改一下订单数据
kibana查看索引,显示同步成功
4、我们将刚刚新增的订单数据在数据库中删除
同时kibana中也删除成功,说明我们删除的同步也生效了。
上述我们演示了主表数据修改时,同步主表以及两张子表的数据;有时我们需要修改子表数据,但也需要实现数据同步。
这就需要我们实现一个子表的EntryHandler
,用于监听子表的数据变化,其逻辑是子表数据更新时,查询主子表的数据,再同步更新到索引中即可。
注意要监听的是子表,每张子表一个监听器,如果需要监听两张子表,那么就需要分别创建两个监听器
@CanalTable("product")
@Component
@AllArgsConstructor
@Slf4j
public class ProductHandler implements EntryHandler {
private final ElasticsearchRestTemplate elasticsearchRestTemplate;
@Override
public void insert(Product product) {
// TODO
}
@Override
public void update(Product before, Product after) {
// TODO
}
@Override
public void delete(Product product) {
// TODO
}
}
演示源码
文中演示源码可在如下地址下载:
git源码地址
总结自此我们的数据同步就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter
极大的简化了我们自定义canal客户端的难度。
不过遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3
版本的,不过实测还不影响我们对接canal1.1.6。如果大家对canal客户端又更高性能的需求,可以研究源码,高度二开。
后续我们将给大家讲解如何实现类canal-spring-boot-starter
这样的第三方依赖组件。感兴趣的同学可以关注专栏。