您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

TransportClientFactory

宝哥大数据 发布时间:2019-04-14 11:44:07 ,浏览量:1

TransportClientFactory是创建传输客户端(TransportClient)的工厂类, TransportClient用于向Netty服务端发送RPC请求,

1.1、由TransportContext创建
  /**
   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
   * to create a Client.
   */
  public TransportClientFactory createClientFactory(List bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(Lists.newArrayList());
  }

1.2、组成
  • context:即参数传递的TransportContext的引用;
    • conf:即TransportConf,这里通过调用TransportContext的getConf获取;
    • numConnectionsPerPeer:即从TransportConf获取的key为”spark.+模块名+.io.numConnectionsPerPeer”的属性值。此属性值用于指定对等节点间的连接数。这里的模块名实际为TransportConf的module字段,Spark的很多组件都利用RPC框架构建,它们之间按照模块名区分,例如RPC模块的key为“spark.rpc.io.numConnectionsPerPeer”;
    • ioMode:IO模式,即从TransportConf获取key为”spark.+模块名+.io.mode”的属性值。默认值为NIO,Spark还支持EPOLL;
  • clientBootstraps:即参数传递的TransportClientBootstrap列表;
  • connectionPool:即针对每个Socket地址的连接池ClientPool的缓存;connectionPool的数据结构较为复杂,下图为connectionPool的数据结构。
    • SocketAddress
    • ClientPool
      • clients TransportClient[]
      • locks Object[] 在这里插入图片描述
  • rand:对Socket地址对应的连接池ClientPool中缓存的TransportClient进行随机选择,对每个连接做负载均衡;
  • socketChannelClass:客户端Channel被创建时使用的类,通过ioMode来匹配,默认为NioSocketChannel,Spark还支持EpollEventLoopGroup;
  • workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup。workerGroup的实际类型是NioEventLoopGroup;
  • pooledAllocator :汇集ByteBuf但对本地线程缓存禁用的分配器。
    • NettyUtils 上面三个都是通过NettyUntils创建
    public TransportClientFactory(
            TransportContext context,
            List clientBootstraps) {
        this.context = Preconditions.checkNotNull(context);
        this.conf = context.getConf();
        this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
        this.connectionPool = new ConcurrentHashMap();
        this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
        this.rand = new Random();

        IOMode ioMode = IOMode.valueOf(conf.ioMode());
        this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
        this.workerGroup = NettyUtils.createEventLoop(
                ioMode,
                conf.clientThreads(),
                conf.getModuleName() + "-client");
        this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
                conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0415s