canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。
canal的ha分为两部分,canal server和canal client分别有对应的ha实现
- canal server: 为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
- canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
- 运行canal的机器: node1 , node2
- zookeeper地址为 node1:2181,node2:2181,node3:2181
- mysql地址:node1:3306
按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
修改canal.properties,加上zookeeper配置
canal.zkServers=node1:2181,node2,node3
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
创建example目录,并修改instance.properties
canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可
canal.instance.master.address = node1:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
启动两台机器的canal-------
ssh node1
sh bin/startup.sh
--------
ssh node2
sh bin/startup.sh
启动后,可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志。
比如这里启动成功的是node1
2013-03-19 18:18:20.590 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-03-19 18:18:20.596 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-03-19 18:18:20.831 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-03-19 18:18:20.845 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful...
查看一下zookeeper中的节点信息,也可以知道当前工作的节点为node1:11111
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.88.120:11111","cid":1}
客户端链接, 消费数据
1、可以直接指定zookeeper地址和instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:
CanalConnector connector = CanalConnectors.newClusterConnector("node1:2181", "example", "canal", "canal");
2、链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等
[zk: localhost:2181(CONNECTED) 17] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.88.1:60957","clientId":1001}
3、数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次重启client时,会从这最后一个位点继续进行消费)
[zk: localhost:2181(CONNECTED) 16] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node1","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000025","position":51944693,"serverId":1,"timestamp":1575424854000}}
重启一下canal server
停止正在工作的node1的canal server
ssh node1
sh bin/stop.sh
这时node2会立马启动example instance,提供新的数据服务
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.88.121:11111","cid":1}
与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成
Canal Server HA的流程图- canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
- 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。
HA的实现,客户端是ClientRunningMonitor,服务端是ServerRunningMonitor。
直接取若干个Canal客户端,如果同时启动,只有一个客户端能从Canal服务器端获取到binlog消息,其他客户端不能拉取到binlog消息。
从运行配置中,复制一个同样的配置。然后启动运行。
关掉之前的客户端,然后稍微等待一会,查看另外一个客户端是否能够获取到binlog日志消息。
Client1的日志:
****************************************************
* Batch Id: [3] ,count : [3] , memsize : [198] , Time : 2017-10-12 17:59:59
* Start : [mysql-bin.000004:1656:1507802398000(2017-10-12 17:59:58)]
* End : [mysql-bin.000004:1831:1507802398000(2017-10-12 17:59:58)]
****************************************************
================> binlog[mysql-bin.000004:1656] , executeTime : 1507802398000 , delay : 1188ms
BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:1782] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802398000 , delay : 1199ms
uid : 1 type=int(4)
name : zqh type=varchar(10) update=true
----------------
END ----> transaction id: 0
================> binlog[mysql-bin.000004:1831] , executeTime : 1507802398000 , delay : 1236ms
## stop the canal client## canal client is down.
停止Client1后,Client2的日志:
****************************************************
* Batch Id: [4] ,count : [3] , memsize : [198] , Time : 2017-10-12 18:02:15
* Start : [mysql-bin.000004:1906:1507802534000(2017-10-12 18:02:14)]
* End : [mysql-bin.000004:2081:1507802534000(2017-10-12 18:02:14)]
****************************************************
================> binlog[mysql-bin.000004:1906] , executeTime : 1507802534000 , delay : 1807ms
BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:2032] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802534000 , delay : 1819ms
uid : 1 type=int(4)
name : zqhx type=varchar(10) update=true
----------------
END ----> transaction id: 0
================> binlog[mysql-bin.000004:2081] , executeTime : 1507802534000 , delay : 1855ms
观察ZK节点中instance对应的client节点,在Client切换时,会进行变更。 比如下面的客户端从56806端口切换到了56842端口。 把所有客户端都关闭后,1001下没有running。表示instance没有客户端消费binlog了。
启动两个客户端,第一个客户端(56806)正在运行
[zk: 192.168.6.52:2181(CONNECTED) 29] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.57.241.44:56806","clientId":1001}
停止第一个客户端,删除节点
[zk: 192.168.6.52:2181(CONNECTED) 30] get /otter/canal/destinations/example/1001/running
Node does not exist: /otter/canal/destinations/example/1001/running
第二个客户端(56842)成为主
[zk: 192.168.6.52:2181(CONNECTED) 31] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.57.241.44:56842","clientId":1001}
[zk: 192.168.6.52:2181(CONNECTED) 32] ls /otter/canal/destinations/example/1001
[cursor]
具体实现相关类有:ClientRunningMonitor/ClientRunningListener/ClientRunningData。
client running相关控制,主要为解决client自身的failover机制。 canal client允许同时启动多个canal client, 通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式, 这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.
下图左边是客户端的HA实现,右边是服务端的HA实现