Transporter在Dubbo架构中,属于比较底层的存在。主要用于请求、响应的传输。
其在Dubbo架构中的位置如下:
而关于Transporter的实现,主要有以下几种:
本文主要来了解下NettyTransporter(默认传输方式)的相关知识点。
1.NettyTransporter的那些方法/**
* 默认使用Netty4实现
*/
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
// 服务端交由NettyServer来实现
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
// 客户端交由NettyClient实现
return new NettyClient(url, handler);
}
}
代码比较简单,就是分别实现了Server和Client端。下面来分别看下其实现。
2.NettyServer的实现public class NettyServer extends AbstractServer implements RemotingServer {
// 在这里缓存了port:channel的对应关系
private Map channels;
// 创建的ServerBootstrap,这里都是标准的Netty类
private ServerBootstrap bootstrap;
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
// 这个方法比较关键,在这里创建了默认的线程池,具体见2.1
// 并且创建了ChannelHandler来处理请求,具体见2.2
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
}
2.1 AbstractServer的构造
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 根据URL里获取相关ip port参数信息
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 开启一个端口服务,具体在子类中(NettyServer)实现
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 在这里创建线程池,具体就不再分析,后续会有专门分析线程池的博客
executor = executorRepository.createExecutorIfAbsent(url);
}
}
2.2 ChannelHandlers.wrap(handler, url) 构造ChannelHandler
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 最终调用在这里
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 在这里可以看到Handler的包含关系
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
通过代码可知,Handler的包含关系为:MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler(默认)
实际最终都是交由AllChannelHandler来处理。我们来简单分析下AllChannelHandler,看下其是如何处理请求
2.3 AllChannelHandlerpublic class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
// 对于接收到的请求,创建一个ChannelEventRunnable事件,交由Executor线程池执行
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
...
}
AllChannelHandler将所有的请求都转换为一个ChannelEventRunnable事件,然后交由Executor线程池来执行
通过对ChannelEventRunnable的分析,其实现了Runnable接口,其run()方法,最终还是将所有的请求都交由AllChannelHandler.handler来实现了。
经历了这么一大圈,这个AllChannelHandler.handler到底是谁呢?读者可以自行Debug下代码,确定下这个问题
分析了NettyServer的基本参数和构造方法之后,来看下其doOpen()方法,是如何开启一个端口服务。
2.4 NettyServer.doOpen() 开启端口服务public class NettyServer extends AbstractServer implements RemotingServer {
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
// 自定义的nettyServerHandler,用于处理请求
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
// 主要关注点在这里,需要明确的了解NettyServer的这些Handler
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
代码并不算复杂,都是标准的Netty创建Server的过程。
这里需要对Netty有所了解,如果读者不是很了解的话则需要先找些文档学习下Netty4.
我们在创建NettyServer的时候,基本都是模板式创建,真正需要关注的就是其添加了哪些ChannelHandler。
在这里我们简单了解下即可,后续会有博客来专门说明。
NettyServer总结:
* 支持AllDispatcher模式,将所有的请求都交由线程池来完成(默认创建200个线程的线程池)
* 自定义Dubbo协议,请求序列化反序列化处理Handler都由NettyCodecAdapter创建完成
* NettyServer本身也实现了ChannelHandler接口,最终处理请求还是自身而非其他Handler
3.NettyClient分析完了NettyServer,再来分析NettyClient就简单很多了。
public class NettyClient extends AbstractClient {
private Bootstrap bootstrap;
private volatile Channel channel;
// 在这里构造线程池和ChannelHandler的方式与NettyServer一样,不再赘述
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
}
3.1 NettyClient.doOpen() 创建Bootstrap基本参数
public class NettyClient extends AbstractClient {
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 这个需要多关注下
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
...
}
});
}
}
Bootstrap的创建也是标准方式,学习过Netty的同学不需要过多关注。
里面有一个IdleStateHandler关注下即可,这个是客户端的心跳Handler,以heartbeatInterval的频率向server发送心跳请求。
3.2 NettyClient.doConnect() 客户端开启连接public class NettyClient extends AbstractClient {
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 直接创建对server的连接
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
// 因为连接结果是Future,所以需要针对Future的结果进行处理
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
// copy reference
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
// just add new valid channel to NettyChannel's cache
if (!isConnected()) {
//future.cancel(true);
}
}
}
}
Netty client创建连接使用的是Future,所以可以根据Future的成功与否来做对应处理。
总结:总体而言,NettyServer和NettyClient的创建并不算复杂,都是标准的Netty构造方式。
需要读者对Netty有一定的了解。
重点只需要关注其Decode、Encode Handler(后续会专门介绍这两个Handler),Dispatcher方式等