TransportClientFactory是创建传输客户端(TransportClient)的工厂类, TransportClient用于向Netty服务端发送RPC请求,
1.1、由TransportContext创建 /**
* Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
* a new Client. Bootstraps will be executed synchronously, and must run successfully in order
* to create a Client.
*/
public TransportClientFactory createClientFactory(List bootstraps) {
return new TransportClientFactory(this, bootstraps);
}
public TransportClientFactory createClientFactory() {
return createClientFactory(Lists.newArrayList());
}
1.2、组成
- context:即参数传递的TransportContext的引用;
- conf:即TransportConf,这里通过调用TransportContext的getConf获取;
- numConnectionsPerPeer:即从TransportConf获取的key为”spark.+模块名+.io.numConnectionsPerPeer”的属性值。此属性值用于指定对等节点间的连接数。这里的模块名实际为TransportConf的module字段,Spark的很多组件都利用RPC框架构建,它们之间按照模块名区分,例如RPC模块的key为“spark.rpc.io.numConnectionsPerPeer”;
- ioMode:IO模式,即从TransportConf获取key为”spark.+模块名+.io.mode”的属性值。默认值为NIO,Spark还支持EPOLL;
- clientBootstraps:即参数传递的TransportClientBootstrap列表;
- connectionPool:即针对每个Socket地址的连接池ClientPool的缓存;connectionPool的数据结构较为复杂,下图为connectionPool的数据结构。
- SocketAddress
- ClientPool
- clients TransportClient[]
- locks Object[]
- rand:对Socket地址对应的连接池ClientPool中缓存的TransportClient进行随机选择,对每个连接做负载均衡;
- socketChannelClass:客户端Channel被创建时使用的类,通过ioMode来匹配,默认为NioSocketChannel,Spark还支持EpollEventLoopGroup;
- workerGroup:根据Netty的规范,客户端只有worker组,所以此处创建workerGroup。workerGroup的实际类型是NioEventLoopGroup;
- pooledAllocator :汇集ByteBuf但对本地线程缓存禁用的分配器。
- NettyUtils 上面三个都是通过NettyUntils创建
public TransportClientFactory(
TransportContext context,
List clientBootstraps) {
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf();
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
this.rand = new Random();
IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(
ioMode,
conf.clientThreads(),
conf.getModuleName() + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}