- 1. 准备工作
- 1.1 active mysql查看
- 1.2 mysql数据情况
- 1.3 全量数据导出
- 1.4 数据库的数据更新
- 2. canal.deploy配置修改
- 2.2 删除conf/example,建立新的example实例
- 3. 同步代码的开发
- 3.1 重复测试的脚本
- 3.2 scala同步代码
- 3.2.1 db1_tables.txt
- 3.2.2 etl.properties
- 3.2.3 launch.properties
- 3.2.4 logback.xml
- 3.2.5 MysqlConnectPool
- 3.2.6 Postgres_etl.scala
- 3.2.7 Mysql_postgres_datatype
- 3.2.8 PostgresConnectPool
- 3.2.9 Sync_launch.scala
- 3.3 代码启动顺序
- 3.4 postgresql数据库结果
canal高可用和mysql节点自动切换的部署,可以参考我的这篇文章
1. 准备工作 1.1 active mysql查看因为我们部署了mysql节点自动切换,我们需要查看现在同步的是哪一台mysql服务器
- 先查看example_db1实例运行的服务器
[zk: canal1:2181,canal2:2181,canal3:2181(CONNECTED) 4]
[zk: canal1:2181,canal2:2181,canal3:2181(CONNECTED) 4] get /otter/canal/destinations/example_db1/running
{"active":true,"address":"192.168.23.31:11111"}
[zk: canal1:2181,canal2:2181,canal3:2181(CONNECTED) 5]
可以看到example_db1实例运行在canal1上
- 然后我们查看canal1的example_db1日志
[root@canal1 example_db1]#
[root@canal1 example_db1]# pwd
/root/canal.deployer-1.1.5/logs/example_db1
[root@canal1 example_db1]#
[root@canal1 example_db1]# tail example_db1.log
......省略部分......
2021-07-29 04:16:13.461 [destination = example_db1 , address = canal2/192.168.23.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000026:4:1627458457000
2021-07-29 04:16:14.278 [destination = example_db1 , address = canal2/192.168.23.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000026,position=4,serverId=2,gtid=,timestamp=1627458457000] cost : 877ms , the next step is binlog dump
[root@canal1 example_db1]#
可以看到当前active的mysql为canal2
1.2 mysql数据情况[root@canal2 ~]# mysql -u root -pRoot_123
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 22
Server version: 8.0.25 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| db1 |
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
6 rows in set (0.01 sec)
mysql> use db1;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+---------------+
| Tables_in_db1 |
+---------------+
| tb1_1 |
| tb1_2 |
+---------------+
2 rows in set (0.00 sec)
mysql> select * from tb1_1;
+------+-------+--------+
| idA | nameA | scoreA |
+------+-------+--------+
| 1 | 1 | 1.10 |
| 2 | 2 | 2.20 |
| 3 | 3 | 3.30 |
+------+-------+--------+
3 rows in set (0.00 sec)
mysql> select * from tb1_2;
+------+-------+------+
| idA | nameA | ageA |
+------+-------+------+
| 1 | 1 | 1 |
| 2 | 2 | 2 |
| 3 | 3 | 3 |
+------+-------+------+
3 rows in set (0.00 sec)
mysql>
1.3 全量数据导出
这里我们找一台临时的数据库服务器canal3, 数据同步脚本如下:
[root@canal3 ~]#
[root@canal3 ~]# pwd
/root
[root@canal3 ~]#
[root@canal3 ~]# cat mysql_dump_load.sh
#!/usr/bin/env bash
# 脚本参数
mysql_source_host=192.168.23.32
mysql_source_port=3306
mysql_source_user=root
mysql_source_password=Root_123
mysql_source_db=db1
mysql_target_host=192.168.23.33
mysql_target_port=3306
mysql_target_user=root
mysql_target_password=Root_123
mysql_target_db=db1
mysql_dump_path=/root/db1.sql
# 不导出视图的参数
ignore_tables="--ignore-table=${mysql_source_db}.tb1 --ignore-table=${mysql_source_db}.tb2"
# 导出数据库数据
mysqldump -h ${mysql_host} -P ${mysql_port} -u ${mysql_user} -p${mysql_password} --databases ${mysql_source_db} --flush-logs --lock-all-tables --master-data=1 --column_statistics=0 ${ignore_tables} > ${mysql_dump_path}
# 获取binlog文件名和位置,并在导出的sql文件中添加注释
mysql_binlog_filename=`head -10000 ${mysql_dump_path} | grep 'CHANGE MASTER TO MASTER_LOG_FILE' | sed -r 's/CHANGE MASTER TO MASTER_LOG_FILE.*(mysql-bin\.[0-9]+).*MASTER_LOG_POS.*/\1/g'`
mysql_binlog_position=`head -10000 ${mysql_dump_path} | grep 'CHANGE MASTER TO MASTER_LOG_FILE' | sed -r 's/CHANGE MASTER TO MASTER_LOG_FILE.*MASTER_LOG_POS=([0-9]+).*/\1/g'`
sed -ri 's/(CHANGE MASTER TO MASTER_LOG_FILE.*MASTER_LOG_POS.*;)/-- \1/g' ${mysql_dump_path}
mysql -h ${mysql_target_host} -P ${mysql_target_port} -u ${mysql_target_user} -p${mysql_target_password} -e "drop database if exists ${mysql_target_db}"
mysql -h ${mysql_target_host} -P ${mysql_target_port} -u ${mysql_target_user} -p${mysql_target_password} -e "source ${mysql_dump_path}"
echo ${mysql_binlog_filename}
echo ${mysql_binlog_position}
[root@canal3 ~]#
运行数据导出脚本
[root@canal3 ~]#
[root@canal3 ~]# sh mysql_dump_load.sh
mysqldump: [Warning] Using a password on the command line interface can be insecure.
mysql: [Warning] Using a password on the command line interface can be insecure.
mysql: [Warning] Using a password on the command line interface can be insecure.
mysql-bin.000026
156
[root@canal3 ~]#
此时我们可以看到flush后的最新binlog文件是mysql-bin.000026,position是156
1.4 数据库的数据更新然后我们再对db1数据库的数据做一些变更
[root@canal2 ~]# mysql -u root -pRoot_123
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 22
Server version: 8.0.25 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
mysql> insert into db1.tb1_1 values(4,'4',4.4),(5,'5',5.5);
Query OK, 2 rows affected (0.00 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> delete from db1.tb1_2 where idA = 2;
Query OK, 1 rows affected (0.00 sec)
mysql> delete from db1.tb1_2 where idA = 3;
Query OK, 1 rows affected (0.00 sec)
mysql>
2. canal.deploy配置修改
修改3台服务器的配置
- canal.properties
修改部分:
canal.destinations =
canal.serverMode = tcp
这里我们取消默认的example, 改用自动扫描,会扫描conf目录下的文件夹名(除了metrics和spring)来做example实例, 一般一个example实例对应一个数据库的同步
然后重启canal服务
2.2 删除conf/example,建立新的example实例在canal1的/root目录下准备一个example实例,步骤如下
[root@canal1 conf]# ls
canal_local.properties canal.properties example logback.xml metrics spring
[root@canal1 conf]#
[root@canal1 conf]# pwd
/root/canal.deployer-1.1.5/conf
[root@canal1 conf]#
[root@canal1 conf]# cp -r example/ ~
[root@canal1 conf]#
[root@canal1 conf]# cd ~
[root@canal1 ~]#
[root@canal1 ~]# mv example/ example_db1
[root@canal1 ~]#
[root@canal1 ~]# ll example_db1/
总用量 164
-rw-r--r--. 1 root root 163840 6月 25 11:01 h2.mv.db
-rwxr-xr-x. 1 root root 2621 6月 25 11:01 instance.properties
[root@canal1 ~]#
[root@canal1 ~]# rm example_db1/h2.mv.db -rf
[root@canal1 ~]#
example_db1/instance.properties设置了binlog的读取起始点,且只同步db1数据库的数据,内容如下
[root@canal1 ~]#
[root@canal1 ~]# cat example_db1/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal2为1232,canal3为1233
canal.instance.mysql.slaveId=1231
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=canal2:3306
canal.instance.master.journal.name=mysql-bin.000026
canal.instance.master.position=156
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
canal.instance.standby.address = canal1:3306
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal_123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=db1\\.tb\\d_\\d
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
# 需要开启心跳检查
canal.instance.detecting.enable = true
# 心跳检查sql
canal.instance.detecting.sql = select 1
# 心跳检查频率
canal.instance.detecting.interval.time = 3
# 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog
canal.instance.detecting.retry.threshold = 3
# 心跳检查超过失败次数阀值后,是否开启master/standby的切换
canal.instance.detecting.heartbeatHaEnable = true
[root@canal1 ~]#
删除3台服务器的conf/example目录,并将example_db1放到3台服务器的conf目录下,并修改canal2和canal3的instance.properties的canal.instance.mysql.slaveId
参数
因为测试的时候,有时候会删除client在zookeeper的消费position, 和让canal server重新从instance.properties设置的position开始消费,所以需要重启canal server
[root@canal1 ~]#
[root@canal1 ~]# pwd
/root
[root@canal1 ~]#
[root@canal1 ~]# cat example_db_repeat_test.sh
#!/usr/bin/env bash
# 脚本参数
zk_cli=/root/apache-zookeeper-3.6.3-bin/bin/zkCli.sh
zk_url=canal1:2181,canal2:2181,canal3:2181
destinations=example_db1
canal_servers=('canal1' 'canal2' 'canal3')
canal_user=root
canal_restart_sh=/root/canal.deployer-1.1.5/bin/restart.sh
# 删除zk的canal client position
${zk_cli} -server ${zk_url} delete /otter/canal/destinations/${destinations}/1001/cursor
# 重启3台服务的canal server
for canal_server in ${canal_servers[@]}
do
ssh ${canal_user}@${canal_server}
> /root/sync_launch.log 2>&1 &
**/
object Sync_launch {
def load_launch_properties() = {
val prop = new Properties()
val input_stream = new FileInputStream(s"${System.getProperty("user.dir")}/conf/launch.properties")
prop.load(input_stream)
prop
}
def get_entries(prop: Properties, postgresConnectPool: PostgresConnectPool) = {
val connector = CanalConnectors.newClusterConnector(prop.getProperty("zk_url"), prop.getProperty("destination"), "", "")
connector.connect()
connector.subscribe(prop.getProperty("subscribe"))
connector.rollback() // 回滚上次未提交完成的batch
while (true) {
val message = connector.getWithoutAck(1000)
val batch_id = message.getId()
val batch_size = message.getEntries().size()
if (!(batch_id == -1 || batch_size == 0)) {
var postgres_conn: Connection = null
var postgres_stmt: Statement = null
try {
val entries = message.getEntries()
postgres_conn = postgresConnectPool.getConnection()
postgres_stmt = postgres_conn.createStatement()
deal_entrys(entries, postgres_stmt, prop, postgres_conn) // st是地址传递
connector.ack(batch_id)
} catch {
case e: Exception => {
connector.rollback(batch_id)
e.printStackTrace()
throw e
}
} finally {
postgres_stmt.close()
postgres_conn.close()
}
}
Thread.sleep(1000L) // 等待1秒
}
connector
}
def print_column(columns: java.util.List[CanalEntry.Column]) {
// 循环处理列
for (column {
// SQL解析和数据插入的异常,直接忽略
e.printStackTrace()
}
}
}
}
}
def main(args: Array[String]): Unit = {
var connector: CanalConnector = null
try {
val prop = load_launch_properties()
val postgresConnectPool = new PostgresConnectPool(s"${System.getProperty("user.dir")}/conf/launch.properties")
connector = get_entries(prop, postgresConnectPool)
} catch {
case e: Exception => {
e.printStackTrace()
throw e
}
} finally {
connector.disconnect()
}
}
}
3.3 代码启动顺序
- 执行Postgres_etl.scala
- 运行example_db_repeat_test.sh
- 执行Sync_launch.scala 输出如下
......省略部分......
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>一条SQL语句处理>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
canal_server_id: 2, binlog_file[mysql-bin.000026:370], db_tb[db1.tb1_1], event_type: INSERT, execute_time: 2021-07-28 15:50:28
>>>>>>>>>>>>>>>>>>>>>>>>>一个更新处理>>>>>>>>>>>>>>>>>>>
column[idA:int:4], is_key: false, is_null: false, update: true
column[nameA:varchar(32):4], is_key: false, is_null: false, update: true
column[scoreA:decimal(5,2):4.40], is_key: false, is_null: false, update: true
insert into db1.public.tb1_1 (idA, nameA, scoreA) values(4, '4', 4.40)
>>>>>>>>>>>>>>>>>>>>>>>>>一个更新处理>>>>>>>>>>>>>>>>>>>
column[idA:int:5], is_key: false, is_null: false, update: true
column[nameA:varchar(32):5], is_key: false, is_null: false, update: true
column[scoreA:decimal(5,2):5.50], is_key: false, is_null: false, update: true
insert into db1.public.tb1_1 (idA, nameA, scoreA) values(5, '5', 5.50)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>一条SQL语句处理>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
canal_server_id: 2, binlog_file[mysql-bin.000026:668], db_tb[db1.tb1_2], event_type: DELETE, execute_time: 2021-07-28 15:51:14
>>>>>>>>>>>>>>>>>>>>>>>>>一个更新处理>>>>>>>>>>>>>>>>>>>
column[idA:int:2], is_key: false, is_null: false, update: false
column[nameA:varchar(32):2], is_key: false, is_null: false, update: false
column[ageA:int:2], is_key: false, is_null: false, update: false
delete from db1.public.tb1_2 where idA = 2 and nameA = '2' and ageA = 2
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>一条SQL语句处理>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
canal_server_id: 2, binlog_file[mysql-bin.000026:957], db_tb[db1.tb1_2], event_type: DELETE, execute_time: 2021-07-28 15:51:19
>>>>>>>>>>>>>>>>>>>>>>>>>一个更新处理>>>>>>>>>>>>>>>>>>>
column[idA:int:3], is_key: false, is_null: false, update: false
column[nameA:varchar(32):3], is_key: false, is_null: false, update: false
column[ageA:int:3], is_key: false, is_null: false, update: false
delete from db1.public.tb1_2 where idA = 3 and nameA = '3' and ageA = 3
......省略部分......
3.4 postgresql数据库结果
postgres=#
postgres=# \c db1
You are now connected to database "db1" as user "postgres".
db1=#
db1=# select * from tb1_1;
ida | namea | scorea
-----+-------+--------
1 | 1 | 1.10
2 | 2 | 2.20
3 | 3 | 3.30
4 | 4 | 4.40
5 | 5 | 5.50
(5 rows)
db1=#
db1=# select * from tb1_2;
ida | namea | agea
-----+-------+------
1 | 1 | 1
(1 row)
db1=#