您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

TransportServer

宝哥大数据 发布时间:2019-04-14 11:59:00 ,浏览量:1

TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务

1.1、创建RPC服务端

由TransportContext的createServer方法用于创建TransportServer

1.2、TransportServer的构造函数

构造器中的各个变量分别为:

  • context:即参数传递的TransportContext的引用;
  • conf:即TransportConf,这里通过调用TransportContext的getConf获取;
  • appRpcHandler:即RPC请求处理器RpcHandler;
  • bootstraps:即参数传递的TransportServerBootstrap列表;
    public TransportServer(
            TransportContext context,
            String hostToBind,
            int portToBind,
            RpcHandler appRpcHandler,
            List bootstraps) {
        this.context = context;
        this.conf = context.getConf();
        this.appRpcHandler = appRpcHandler;
        this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

        try {
            init(hostToBind, portToBind);
        } catch (RuntimeException e) {
            JavaUtils.closeQuietly(this);
            throw e;
        }
    }
1.3、 TransportServer的初始化

调用init()

  • 创建bossGroup和workerGroup(根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup。);
  • 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器;
  • 调用Netty的API创建Netty的服务端根引导程序并对其进行配置;
  • 为根引导程序设置管道初始化回调函数,此回调函数首先设置TransportServerBootstrap到根引导程序中,然后调用TransportContext的initializePipeline方法初始化Channel的pipeline;
  • 给根引导程序绑定Socket的监听端口,最后返回监听的端口。

    /**
     * init TransportServer
     * @param hostToBind
     * @param portToBind
     */
    private void init(String hostToBind, int portToBind) {

        IOMode ioMode = IOMode.valueOf(conf.ioMode());
        EventLoopGroup bossGroup =
                NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
        EventLoopGroup workerGroup = bossGroup;

        PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
                conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

        bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NettyUtils.getServerChannelClass(ioMode))
                .option(ChannelOption.ALLOCATOR, allocator)
                .childOption(ChannelOption.ALLOCATOR, allocator);
        
        // 。。。。 设置参数
        
        bootstrap.childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                RpcHandler rpcHandler = appRpcHandler;
                for (TransportServerBootstrap bootstrap : bootstraps) {
                    rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
                }
                // 此处调用了TransportContext的initializePipeline, 在initializePipeline中创建了TransportChannelHandler, 并将它绑定到SocketChannel的pipline的handler中
                context.initializePipeline(ch, rpcHandler);
            }
        });

        InetSocketAddress address = hostToBind == null ?
                new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
        channelFuture = bootstrap.bind(address);
        channelFuture.syncUninterruptibly();

        port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
        logger.debug("Shuffle server started on port: {}", port);
    }
1.3.1、ServerBootstrap的childHandler方法调用了TransportContext的initializePipeline, 在initializePipeline中创建了TransportChannelHandler, 并将它绑定到SocketChannel的pipline的handler中
    /**
     * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
     * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
     * response messages.
     *
     * @param channel           The channel to initialize.
     * @param channelRpcHandler The RPC handler to use for the channel.
     * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
     * be used to communicate on this channel. The TransportClient is directly associated with a
     * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
     */
    public TransportChannelHandler initializePipeline(
            SocketChannel channel,
            RpcHandler channelRpcHandler) {
        try {
            TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
            //绑定到SocketChannel的pipeline
            channel.pipeline()
                    .addLast("encoder", ENCODER)
                    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
                    .addLast("decoder", DECODER)
                    .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
                    // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
                    // would require more logic to guarantee if this were not part of the same event loop.
                    .addLast("handler", channelHandler);
            return channelHandler;
        } catch (RuntimeException e) {
            logger.error("Error while initializing Netty pipeline", e);
            throw e;
        }
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0391s