- 1. 全量数据导出同步
-
- 1.1 active mysql查看
- 1.2 canal2 mysql数据的全量导出
-
- 1.2.1. 我们先看mysql的数据
- 1.2.2. 导出mysql的全量数据
- 1.2.3. 将全量数据db.sql导入到canal3上的mysql
- 2. example instance的配置
-
- 2.1 修改conf/canal.properties
- 2.2 删除conf/example,建立新的example实例
- 3. client端代码的开发
-
- 3.1 对db1和db2的数据进行修改
- 3.2 pom.xml添加的依赖
- 3.3 重复测试的脚本
- 3.4 db_sync.java开发
- 3.5 打印的消息和canal3上数据库的结果
-
- 3.5.1 db1数据库同步
- 3.5.2 db2数据库同步
canal高可用和mysql节点自动切换的部署,可以参考我的这篇文章
1. 全量数据导出同步 1.1 active mysql查看因为我们部署了mysql节点自动切换,我们需要查看现在同步的是哪一台mysql服务器
- 先查看example实例运行的服务器
[zk: canal1:2181,canal2:2181,canal3:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.23.31:11111"} [zk: canal1:2181,canal2:2181,canal3:2181(CONNECTED) 1]
可以看到example实例运行在canal1上 2. 然后我们查看canal1的example日志
[root@canal1 example]# [root@canal1 example]# pwd /root/canal.deployer-1.1.5/logs/example [root@canal1 example]# [root@canal1 example]# [root@canal1 example]# tail example.log ......省略部分...... 2021-06-25 09:29:07.624 [destination = example , address = canal2/192.168.23.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position by switch ::1624526107000 2021-06-25 09:29:08.770 [destination = example , address = canal2/192.168.23.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=3348,serverId=2,gtid=,timestamp=1624459434000] cost : 1104ms , the next step is binlog dump [root@canal1 example]# [root@canal1 example]#
可以看到当前active的mysql为canal2
1.2 canal2 mysql数据的全量导出 1.2.1. 我们先看mysql的数据[root@canal2 ~]# mysql -u root -pRoot_123 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 22 Server version: 8.0.25 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> mysql> show databases; +--------------------+ | Database | +--------------------+ | db1 | | db2 | | information_schema | | mysql | | performance_schema | | sys | +--------------------+ 6 rows in set (0.01 sec) mysql> use db1; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> show tables; +---------------+ | Tables_in_db1 | +---------------+ | tb1_1 | | tb1_2 | +---------------+ 2 rows in set (0.00 sec) mysql> select * from tb1_1; +------+-------+--------+ | idA | nameA | scoreA | +------+-------+--------+ | 1 | 1 | 1.10 | | 2 | 2 | 2.20 | | 3 | 3 | 3.30 | +------+-------+--------+ 3 rows in set (0.00 sec) mysql> select * from tb1_2; +------+-------+------+ | idA | nameA | ageA | +------+-------+------+ | 1 | 1 | 1 | | 2 | 2 | 2 | | 3 | 3 | 3 | +------+-------+------+ 3 rows in set (0.00 sec) mysql> use db2; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> show tables; +---------------+ | Tables_in_db2 | +---------------+ | tb2_1 | | tb2_2 | +---------------+ 2 rows in set (0.00 sec) mysql> select * from tb2_1; +------+-------+--------+ | idB | nameB | scoreB | +------+-------+--------+ | 1 | 1 | 1.10 | | 2 | 2 | 2.20 | | 3 | 3 | 3.30 | +------+-------+--------+ 3 rows in set (0.00 sec) mysql> select * from tb2_2; +------+-------+------+ | idB | nameB | ageB | +------+-------+------+ | 1 | 1 | 1 | | 2 | 2 | 2 | | 3 | 3 | 3 | +------+-------+------+ 3 rows in set (0.00 sec) mysql>1.2.2. 导出mysql的全量数据
[root@canal2 ~]# [root@canal2 ~]# mysqldump -u root -pRoot_123 --databases db1 db2 --flush-logs --lock-all-tables --master-data=1 --column_statistics=0 --ignore-table=db1.tb1 --ignore-table=db2.tb1 > /root/db.sql mysqldump: [Warning] Using a password on the command line interface can be insecure. [root@canal2 ~]#
查看db.sql中,刷新后的binlog日志文件名和position, 并把红框那行注释
我们这里采用mysql来做练习,mysql可以直接运行命令source xxx.sql进行数据导入;如果是其它的数据库,可以先导入到中间mysql数据库,然后再用kettle等ETL工具,或自己写代码实现都可以,导入到自己想要的目标数据库
canal3上的mysql安装可以参考mysql 8.0.25安装
将db.sql上传到canal3, 进行全量导入
[root@canal3 ~]# [root@canal3 ~]# ls anaconda-ks.cfg apache-zookeeper-3.6.3-bin canal.deployer-1.1.5 db.sql jdk1.8.0_291 mysql-8.0.25 [root@canal3 ~]# [root@canal3 ~]# pwd /root [root@canal3 ~]# [root@canal3 ~]# mysql -u root -pRoot_123 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 15 Server version: 8.0.25 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> source db.sql; Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.01 sec) ......省略部分...... Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.00 sec) mysql> mysql> show databases; +--------------------+ | Database | +--------------------+ | db1 | | db2 | | information_schema | | mysql | | performance_schema | | sys | +--------------------+ 6 rows in set (0.00 sec) mysql>2. example instance的配置 2.1 修改conf/canal.properties
此步骤3台服务器同步修改, 内容如下
canal.destinations =
这里我们取消默认的example, 改用自动扫描,会扫描conf目录下的文件夹名(除了metrics和spring),来做example实例, 一般一个example实例对应一个数据库的同步
然后重启canal服务
2.2 删除conf/example,建立新的example实例在canal1的/root目录下准备两个example实例,步骤如下
[root@canal1 conf]# ls canal_local.properties canal.properties example logback.xml metrics spring [root@canal1 conf]# [root@canal1 conf]# pwd /root/canal.deployer-1.1.5/conf [root@canal1 conf]# [root@canal1 conf]# cp -r example/ ~ [root@canal1 conf]# [root@canal1 conf]# cd ~ [root@canal1 ~]# [root@canal1 ~]# mv example/ example_db1 [root@canal1 ~]# [root@canal1 ~]# cp -r example_db1/ example_db2 [root@canal1 ~]# [root@canal1 ~]# ll example_db1/ 总用量 164 -rw-r--r--. 1 root root 163840 6月 25 11:01 h2.mv.db -rwxr-xr-x. 1 root root 2621 6月 25 11:01 instance.properties [root@canal1 ~]# [root@canal1 ~]# rm example_db1/h2.mv.db -rf [root@canal1 ~]# [root@canal1 ~]# ll example_db2/ 总用量 164 -rw-r--r--. 1 root root 163840 6月 25 11:02 h2.mv.db -rwxr-xr-x. 1 root root 2621 6月 25 11:02 instance.properties [root@canal1 ~]# [root@canal1 ~]# rm example_db2/h2.mv.db -rf [root@canal1 ~]#
example_db1/instance.properties设置了binlog的读取起始点,且只同步db1数据库的数据,内容如下
[root@canal1 ~]# [root@canal1 ~]# cat example_db1/instance.properties ################################################# ## mysql serverId , v1.0.26+ will autoGen # canal2为1232,canal3为1233 canal.instance.mysql.slaveId=1231 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=canal2:3306 canal.instance.master.journal.name=mysql-bin.000007 canal.instance.master.position=156 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal canal.instance.standby.address = canal1:3306 #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Canal_123 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=db1\\.tb\\d_\\d # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 ################################################# # 需要开启心跳检查 canal.instance.detecting.enable = true # 心跳检查sql canal.instance.detecting.sql = select 1 # 心跳检查频率 canal.instance.detecting.interval.time = 3 # 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog canal.instance.detecting.retry.threshold = 3 # 心跳检查超过失败次数阀值后,是否开启master/standby的切换 canal.instance.detecting.heartbeatHaEnable = true [root@canal1 ~]#
example_db2/instance.properties设置了binlog的读取起始点,且只同步db2数据库的数据,内容如下
[root@canal1 ~]# [root@canal1 ~]# cat example_db2/instance.properties ################################################# ## mysql serverId , v1.0.26+ will autoGen # canal2为1242,canal3为1243 canal.instance.mysql.slaveId=1241 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=canal2:3306 canal.instance.master.journal.name=mysql-bin.000007 canal.instance.master.position=156 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal canal.instance.standby.address = canal1:3306 #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Canal_123 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=db2\\.tb\\d_\\d # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 ################################################# # 需要开启心跳检查 canal.instance.detecting.enable = true # 心跳检查sql canal.instance.detecting.sql = select 1 # 心跳检查频率 canal.instance.detecting.interval.time = 3 # 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog canal.instance.detecting.retry.threshold = 3 # 心跳检查超过失败次数阀值后,是否开启master/standby的切换 canal.instance.detecting.heartbeatHaEnable = true [root@canal1 ~]#
删除3台服务器的conf/example目录,并将example_db1和example_db2放到3台服务器的conf目录下,并修改canal2和canal3的instance.properties的canal.instance.mysql.slaveId参数
3. client端代码的开发 3.1 对db1和db2的数据进行修改[root@canal2 ~]# mysql -u root -pRoot_123 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 22 Server version: 8.0.25 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> mysql> insert into db1.tb1_1 values(4,'4',4.4),(5,'5',5.5); Query OK, 2 rows affected (0.00 sec) Records: 2 Duplicates: 0 Warnings: 0 mysql> delete from db1.tb1_2 where idA in (2,3); Query OK, 2 rows affected (0.00 sec) mysql> update db2.tb2_1 set scoreB=2.22 where idB=2; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update db2.tb2_1 set scoreB=3.33 where idB=3; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> alter table db2.tb2_2 drop column ageB; Query OK, 0 rows affected (0.04 sec) Records: 0 Duplicates: 0 Warnings: 0 mysql>3.2 pom.xml添加的依赖
com.alibaba.ottercanal.client1.1.5com.alibaba.ottercanal.protocol1.1.5mysqlmysql-connector-java8.0.253.3 重复测试的脚本
因为测试的时候,有时候会删除client在zookeeper的消费position, 和让canal server重新从instance.properties设置的position开始消费,所以需要重启canal server
[root@canal1 ~]# [root@canal1 ~]# pwd /root [root@canal1 ~]# [root@canal1 ~]# cat example_db1_repeat_test.sh #!/usr/bin/env bash # 删除zk的canal client position /root/apache-zookeeper-3.6.3-bin/bin/zkCli.sh -server canal1:2181,canal2:2181,canal3:2181 delete /otter/canal/destinations/example_db1/1001/cursor /root/apache-zookeeper-3.6.3-bin/bin/zkCli.sh -server canal1:2181,canal2:2181,canal3:2181 delete /otter/canal/destinations/example_db2/1001/cursor canal_servers=('canal1' 'canal2' 'canal3') # 重启3台服务的canal server for canal_server in ${canal_servers[@]} do ssh root@${canal_server} << begin2end source /root/.bashrc /root/canal.deployer-1.1.5/bin/restart.sh exit begin2end done [root@canal1 ~]#3.4 db_sync.java开发
这里我们采用tcp解析binlog的方式来同步,也可以用kafka的模式,或者采用adpter适配器的方式,大致原理都是一样的
执行db2数据库的同步,需注释db1的参数,开启db2的参数
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; public class db_sync { public static void main(String[] args) { // db1的参数 String destination="example_db1"; String subscribe="db1\\.tb\\d_\\d"; String db="db1"; // db2的参数 // String destination = "example_db2"; // String subscribe = "db2\\.tb\\d_\\d"; // String db="db2"; CanalConnector connector = CanalConnectors.newClusterConnector("192.168.23.31:2181,192.168.23.32:2181,192.168.23.33:2181", destination, "", ""); Connection mysql_conn = null; Statement st = null; try { mysql_conn = DriverManager.getConnection("jdbc:mysql://192.168.23.33:3306/"+db+"?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8", "root", "Root_123"); st = mysql_conn.createStatement(); connector.connect(); connector.subscribe(subscribe); // 回滚上次未提交完成的batch connector.rollback(); while (true) { Message message = connector.getWithoutAck(5000); long batchId = message.getId(); int batchSize = message.getEntries().size(); if (!(batchId == -1 || batchSize == 0)) { try { List<CanalEntry.Entry> entries = message.getEntries(); dealEntrys(entries, st); // st是地址传递 connector.ack(batchId); } catch (Exception e) { connector.rollback(batchId); e.printStackTrace(); } } try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (SQLException e) { e.printStackTrace(); } finally { connector.disconnect(); try { mysql_conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void dealEntrys(List<CanalEntry.Entry> entries, Statement st) { // 循环处理事务 for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } CanalEntry.EventType eventType = rowChange.getEventType(); String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); System.out.println(String.format("binlog[%s:%s], name[%s.%s], eventType: %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), schemaName, tableName, eventType )); // 表结构更改的同步 if (eventType == CanalEntry.EventType.ALTER) { if (rowChange.getIsDdl()) { try { System.out.println(rowChange.getSql()); st.execute(rowChange.getSql()); } catch (SQLException e) { e.printStackTrace(); } } } // 循环处理每条数据;比如一个insert命令,可以插入多条数据 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); insertData(rowData.getAfterColumnsList(), st, schemaName, tableName); } else if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); deleteData(rowData.getBeforeColumnsList(), st, schemaName, tableName); } else { System.out.println("<<<<<<<<<关注打赏