您当前的位置: 首页 >  kafka

Bulut0907

暂无认证

  • 5浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

canal将binlog直接同步到kafka

Bulut0907 发布时间:2021-06-29 00:07:07 ,浏览量:5

目录
  • 1. 说明
  • 2. 配置修改
    • 2.1 conf/canal.properties修改
    • 2.2 新建一个example_kafka实例
    • 2.3 kafka_sync.java代码的开发

1. 说明

canal只需配置一个kafka的example实例,就可以直接将binlog同步到kafka, 此时kafka相当于一个id为1001的client

各组件说明如下表:

组件版本服务器安装教程canal1.1.5canal1, canal2, canal3canal HA安装 + mysql多节点自动切换kafka2.8.0canal1, canal2, canal3kafka全分布式安装mysql8.0.25canal1, canal2mysql master-master架构搭建zookeeper3.6.3canal1, canal2, canal3zookeeper全分布式安装 2. 配置修改 2.1 conf/canal.properties修改
# 修改部分
# 此参数不能在example/instance.properties设置;开启此模式, 参数canal.port = 11111无效
canal.serverMode = kafka

kafka.bootstrap.servers = canal1:9092,canal2:9092,canal3:9092
canal.mq.flatMessage = false

# 添加部分
canal.mq.transaction = false
2.2 新建一个example_kafka实例

先拷贝一个,用于修改

[root@canal1 ~]# 
[root@canal1 ~]# pwd
/root
[root@canal1 ~]# 
[root@canal1 ~]# cp -r canal.deployer-1.1.5/conf/example_db1 example_kafka
[root@canal1 ~]# 
[root@canal1 ~]# ls | grep example_kafka
example_kafka
[root@canal1 ~]#
[root@canal1 ~]# rm example_kafka/h2.mv.db -f
[root@canal1 ~]#
[root@canal1 ~]# ll example_kafka/
总用量 4
-rwxr-xr-x. 1 root root 2680 6月  27 17:29 instance.properties
[root@canal1 ~]# 

修改example_kafka/instance.properties

因为我们下面kafka使用了3个partition, canal在kafka自动创建topic默认是1个partition, 所以我们这里修改kafka_2.13-2.8.0/config/server.properties的参数num.partitions=3,然后重启kafka

[root@canal1 ~]# pwd
/root
[root@canal1 ~]# 
[root@canal1 ~]# cat example_kafka/instance.properties 
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal2为1252,canal3为1253
canal.instance.mysql.slaveId=1251

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=canal2:3306
canal.instance.master.journal.name=mysql-bin.000007
canal.instance.master.position=156
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

canal.instance.standby.address = canal1:3306
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal_123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=db1\\.tb\\d_\\d
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=db1:db1\\.tb\\d_\\d
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
canal.mq.partitionHash=db1\\.tb\\d_\\d:idA
canal.mq.dynamicTopicPartitionNum=db\\d:3,db1:3
#################################################


# 需要开启心跳检查
canal.instance.detecting.enable = true 
# 心跳检查sql
canal.instance.detecting.sql = select 1
# 心跳检查频率
canal.instance.detecting.interval.time = 3 
# 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog
canal.instance.detecting.retry.threshold = 3  
# 心跳检查超过失败次数阀值后,是否开启master/standby的切换
canal.instance.detecting.heartbeatHaEnable = true 
[root@canal1 ~]# 

将example_kafka移到3台服务器的canal.deployer-1.1.5/conf下, canal会自动扫描到example_kafka

[root@canal1 ~]# pwd
/root
[root@canal1 ~]# 
[root@canal1 ~]# mv example_kafka canal.deployer-1.1.5/conf
[root@canal1 ~]#
[root@canal1 ~]# scp -r canal.deployer-1.1.5/conf/example_kafka root@canal2:/root/canal.deployer-1.1.5/conf/
instance.properties                                100% 2752     1.7MB/s   00:00    
[root@canal1 ~]# scp -r canal.deployer-1.1.5/conf/example_kafka root@canal3:/root/canal.deployer-1.1.5/conf/
instance.properties                                100% 2752     2.1MB/s   00:00    
[root@canal1 ~]# 

修改canal2和canal3的conf/example_kafka/instance.properties的canal.instance.mysql.slaveId参数

因为我们只将db1数据库的发送到了db1 topic, db2的则发送到了默认的example topic;而且db1 topic按idA进行了分区

2.3 kafka_sync.java代码的开发
import com.alibaba.otter.canal.protocol.Message;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class kafka_sync {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.23.31:9092,192.168.23.32:9092,192.168.23.33:9092");
        props.setProperty("group.id", "kafka_sync");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        // KafkaCanalConnector不能设置properties, auto.offset.reset设置为latest会重复消费kafka最后一条消息
        props.setProperty("auto.offset.reset", "earliest");

        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置为Message类型
        props.setProperty("value.deserializer", "com.alibaba.otter.canal.client.kafka.MessageDeserializer");

        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("db1"));
        try {
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord record : records) {

                    // 获取到了message, 后面的开发就可以参考进行开发了
                    long batchId = record.value().getId();
                    int batchSize = record.value().getEntries().size();

                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }


            }
        } finally {
            consumer.close();
        }


    }


}

测试的时候,可以更改group.id参数进行多次测试

关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.1284s