目录
1. 说明
- 1. 说明
- 2. 配置修改
- 2.1 conf/canal.properties修改
- 2.2 新建一个example_kafka实例
- 2.3 kafka_sync.java代码的开发
canal只需配置一个kafka的example实例,就可以直接将binlog同步到kafka, 此时kafka相当于一个id为1001的client
各组件说明如下表:
组件版本服务器安装教程canal1.1.5canal1, canal2, canal3canal HA安装 + mysql多节点自动切换kafka2.8.0canal1, canal2, canal3kafka全分布式安装mysql8.0.25canal1, canal2mysql master-master架构搭建zookeeper3.6.3canal1, canal2, canal3zookeeper全分布式安装 2. 配置修改 2.1 conf/canal.properties修改# 修改部分
# 此参数不能在example/instance.properties设置;开启此模式, 参数canal.port = 11111无效
canal.serverMode = kafka
kafka.bootstrap.servers = canal1:9092,canal2:9092,canal3:9092
canal.mq.flatMessage = false
# 添加部分
canal.mq.transaction = false
2.2 新建一个example_kafka实例
先拷贝一个,用于修改
[root@canal1 ~]#
[root@canal1 ~]# pwd
/root
[root@canal1 ~]#
[root@canal1 ~]# cp -r canal.deployer-1.1.5/conf/example_db1 example_kafka
[root@canal1 ~]#
[root@canal1 ~]# ls | grep example_kafka
example_kafka
[root@canal1 ~]#
[root@canal1 ~]# rm example_kafka/h2.mv.db -f
[root@canal1 ~]#
[root@canal1 ~]# ll example_kafka/
总用量 4
-rwxr-xr-x. 1 root root 2680 6月 27 17:29 instance.properties
[root@canal1 ~]#
修改example_kafka/instance.properties
因为我们下面kafka使用了3个partition, canal在kafka自动创建topic默认是1个partition, 所以我们这里修改kafka_2.13-2.8.0/config/server.properties的参数num.partitions=3
,然后重启kafka
[root@canal1 ~]# pwd
/root
[root@canal1 ~]#
[root@canal1 ~]# cat example_kafka/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal2为1252,canal3为1253
canal.instance.mysql.slaveId=1251
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=canal2:3306
canal.instance.master.journal.name=mysql-bin.000007
canal.instance.master.position=156
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
canal.instance.standby.address = canal1:3306
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal_123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=db1\\.tb\\d_\\d
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=db1:db1\\.tb\\d_\\d
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
canal.mq.partitionHash=db1\\.tb\\d_\\d:idA
canal.mq.dynamicTopicPartitionNum=db\\d:3,db1:3
#################################################
# 需要开启心跳检查
canal.instance.detecting.enable = true
# 心跳检查sql
canal.instance.detecting.sql = select 1
# 心跳检查频率
canal.instance.detecting.interval.time = 3
# 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog
canal.instance.detecting.retry.threshold = 3
# 心跳检查超过失败次数阀值后,是否开启master/standby的切换
canal.instance.detecting.heartbeatHaEnable = true
[root@canal1 ~]#
将example_kafka移到3台服务器的canal.deployer-1.1.5/conf下, canal会自动扫描到example_kafka
[root@canal1 ~]# pwd
/root
[root@canal1 ~]#
[root@canal1 ~]# mv example_kafka canal.deployer-1.1.5/conf
[root@canal1 ~]#
[root@canal1 ~]# scp -r canal.deployer-1.1.5/conf/example_kafka root@canal2:/root/canal.deployer-1.1.5/conf/
instance.properties 100% 2752 1.7MB/s 00:00
[root@canal1 ~]# scp -r canal.deployer-1.1.5/conf/example_kafka root@canal3:/root/canal.deployer-1.1.5/conf/
instance.properties 100% 2752 2.1MB/s 00:00
[root@canal1 ~]#
修改canal2和canal3的conf/example_kafka/instance.properties的canal.instance.mysql.slaveId
参数
因为我们只将db1数据库的发送到了db1 topic, db2的则发送到了默认的example topic;而且db1 topic按idA进行了分区
2.3 kafka_sync.java代码的开发import com.alibaba.otter.canal.protocol.Message;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class kafka_sync {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.23.31:9092,192.168.23.32:9092,192.168.23.33:9092");
props.setProperty("group.id", "kafka_sync");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
// KafkaCanalConnector不能设置properties, auto.offset.reset设置为latest会重复消费kafka最后一条消息
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置为Message类型
props.setProperty("value.deserializer", "com.alibaba.otter.canal.client.kafka.MessageDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("db1"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord record : records) {
// 获取到了message, 后面的开发就可以参考进行开发了
long batchId = record.value().getId();
int batchSize = record.value().getEntries().size();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
}
}
}
测试的时候,可以更改group.id参数进行多次测试