您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Canal原理

宝哥大数据 发布时间:2021-02-04 10:07:07 ,浏览量:1

Canal原理 MySQL主备复制原理

在这里插入图片描述

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据,以此来达到数据一致。

mysql的binlog

它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。

binlog 有三种: STATEMENT、ROW、MIXED

  • STATEMENT 记录的是执行的sql语句
  • ROW 记录的是真实的行数据记录
  • MIXED 记录的是1+2,优先按照1的模式记录

名词解释:

什么是中继日志

从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取relay-log日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致

canal 工作原理

在这里插入图片描述

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
架构

在这里插入图片描述

  • server 代表一个 canal 运行实例,对应于一个 jvm
  • instance 对应于一个数据队列 (1个 canal server 对应 1…n 个 instance )
  • instance 下的子模块
    • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
    • eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
    • eventStore: 数据存储
    • metaManager: 增量订阅 & 消费信息管理器

EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新Log Position。流程图如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EbgPyHrn-1612398165743)(assets\image-20200214104557452.png)]

  • EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
    • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
    • void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
    • void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
server/client交互协议 canal client & server

canal client与canal server之间是C/S模式的通信,客户端采用NIO,服务端采用Netty。 canal server启动后,如果没有canal client,那么canal server不会去mysql拉取binlog。 即Canal客户端主动发起拉取请求,服务端才会模拟一个MySQL Slave节点去主节点拉取binlog。 通常Canal客户端是一个死循环,这样客户端一直调用get方法,服务端也就会一直拉取binlog

BIO、NIO、AIO的区别
IO的方式通常分为几种,同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO。

同步阻塞IO:在此种方式下,用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

同步非阻塞IO:在此种方式下,用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。

异步阻塞IO:此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!

异步非阻塞IO:在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前Java中还没有支持此种IO模型。

参考资料:https://www.cnblogs.com/straybirds/p/9479158.html
public class AbstractCanalClientTest {
    protected void process() {
        int batchSize = 5 * 1024; // 一次请求拉取多条记录
        try {
            connector.connect(); // 先连接服务端
            connector.subscribe(); // 订阅
            // keep send request to canal server, thus canal server can fetch binlog from mysql
            while (running) { 
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                printSummary(message, batchId, size);
                printEntry(message.getEntries());
                connector.ack(batchId); // 提交确认
                //connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }
}

canal client与canal server之间属于增量订阅/消费,流程图如下:(其中C端是canal client,S端是canal server)

在这里插入图片描述

canal client调用connect()方法时,发送的数据包(PacketType)类型为:

  1. handshake,
  2. ClientAuthentication。

canal client调用subscribe()方法,类型为[subscription]。

对应服务端采用netty处理RPC请求(CanalServerWithNetty):

public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
    public void start() {
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipelines = Channels.pipeline();
                pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
                // 处理客户端的HANDSHAKE请求
                pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                    new HandshakeInitializationHandler(childGroups));
                // 处理客户端的CLIENTAUTHENTICATION请求
                pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                    new ClientAuthenticationHandler(embeddedServer));

                // 处理客户端的会话请求,包括SUBSCRIPTION,GET等
                SessionHandler sessionHandler = new SessionHandler(embeddedServer);
                pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
                return pipelines;
            }
        });
    }
}

ClientAuthenticationHandler处理鉴权后,会移除HandshakeInitializationHandler和ClientAuthenticationHandler。 最重要的是会话处理器SessionHandler。

以client发送GET,server从mysql得到binlog后,返回MESSAGES给client为例,说明client和server的rpc交互过程:

SimpleCanalConnector发送GET请求,并读取响应结果的流程:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    int size = (batchSize             
关注
打赏
1587549273
查看更多评论
0.0461s