TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务
1.1、创建RPC服务端由TransportContext的createServer方法用于创建TransportServer
1.2、TransportServer的构造函数构造器中的各个变量分别为:
- context:即参数传递的TransportContext的引用;
- conf:即TransportConf,这里通过调用TransportContext的getConf获取;
- appRpcHandler:即RPC请求处理器RpcHandler;
- bootstraps:即参数传递的TransportServerBootstrap列表;
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}
1.3、 TransportServer的初始化
调用init()
- 创建bossGroup和workerGroup(根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup。);
- 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器;
- 调用Netty的API创建Netty的服务端根引导程序并对其进行配置;
- 为根引导程序设置管道初始化回调函数,此回调函数首先设置TransportServerBootstrap到根引导程序中,然后调用TransportContext的initializePipeline方法初始化Channel的pipeline;
- 给根引导程序绑定Socket的监听端口,最后返回监听的端口。
/**
* init TransportServer
* @param hostToBind
* @param portToBind
*/
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
// 。。。。 设置参数
bootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
// 此处调用了TransportContext的initializePipeline, 在initializePipeline中创建了TransportChannelHandler, 并将它绑定到SocketChannel的pipline的handler中
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
1.3.1、ServerBootstrap的childHandler方法调用了TransportContext的initializePipeline, 在initializePipeline中创建了TransportChannelHandler, 并将它绑定到SocketChannel的pipline的handler中
/**
* Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
* has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
* response messages.
*
* @param channel The channel to initialize.
* @param channelRpcHandler The RPC handler to use for the channel.
* @return Returns the created TransportChannelHandler, which includes a TransportClient that can
* be used to communicate on this channel. The TransportClient is directly associated with a
* ChannelHandler to ensure all users of the same channel get the same TransportClient object.
*/
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
//绑定到SocketChannel的pipeline
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}