您当前的位置: 首页 >  golang

分布式事务框架 seata-golang 通信模型详解

阿里云开发者 发布时间:2020-12-30 09:58:15 ,浏览量:3

简介:Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发,作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个大家庭,并改名 dubbo-getty 。

头图.png

作者 | 刘晓敏 于雨

一、简介

Java 的世界里,大家广泛使用的一个高性能网络通信框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个类似 netty 的高性能网络通信库。getty 最初由 dubbogo 项目负责人于雨开发,作为底层通信库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个大家庭,并改名 dubbo-getty 。

18 年的时候,我在公司里实践微服务,当时遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个项目,起初还叫 fescar,后来更名 seata。由于我对开源技术很感兴趣,加了很多社区群,当时也很关注 dubbo-go 这个项目,在里面默默潜水。随着对 seata 的了解,逐渐萌生了做一个 go 版本的分布式事务框架的想法。

要做一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通信。dubbo-go 就是很好的一个例子摆在眼前,遂开始研究 dubbo-go 的底层 getty。

二、如何基于 getty 实现 RPC 通信

getty 框架的整体模型图如下:

1.png

下面结合相关代码,详述 seata-golang 的 RPC 通信过程。

1. 建立连接

实现 RPC 通信,首先要建立网络连接吧,我们从 client.go 开始看起。

func (c *client) connect() {
    var (
        err error
        ss  Session
    )

    for {
        // 建立一个 session 连接
        ss = c.dial()
        if ss == nil {
            // client has been closed
            break
        }
        err = c.newSession(ss)
        if err == nil {
            // 收发报文
            ss.(*session).run()
            // 此处省略部分代码
      
            break
        }
        // don't distinguish between tcp connection and websocket connection. Because
        // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
        ss.Conn().Close()
    }
}

connect() 方法通过 dial() 方法得到了一个 session 连接,进入 dial() 方法:

func (c *client) dial() Session {
    switch c.endPointType {
    case TCP_CLIENT:
        return c.dialTCP()
    case UDP_CLIENT:
        return c.dialUDP()
    case WS_CLIENT:
        return c.dialWS()
    case WSS_CLIENT:
        return c.dialWSS()
    }

    return nil
}

我们关注的是 TCP 连接,所以继续进入 c.dialTCP() 方法:

func (c *client) dialTCP() Session {
    var (
        err  error
        conn net.Conn
    )

    for {
        if c.IsClosed() {
            return nil
        }
        if c.sslEnabled {
            if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
                d := &net.Dialer{Timeout: connectTimeout}
                // 建立加密连接
                conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
            }
        } else {
            // 建立 tcp 连接
            conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
        }
        if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
            conn.Close()
            err = errSelfConnect
        }
        if err == nil {
            // 返回一个 TCPSession
            return newTCPSession(conn, c)
        }

        log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
                    
关注
打赏
1688896170
查看更多评论
0.1913s