上一篇文章中,介绍了Dubbo协议的服务消费者的创建过程。其中有很重要的一个点就是消费者会创建对服务端对应协议端口的长连接,后续的调用发起就是通过这个连接来发起的。
本文就来看下客户端长连接创建的过程。
1.DubboProtocol.refer()故事还是要从DubboProtocol开始说起
public class DubboProtocol extends AbstractProtocol {
public Invoker refer(Class type, URL url) throws RpcException {
return new AsyncToSyncInvoker(protocolBindingRefer(type, url));
}
public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 在这里调用getClients()方法来创建客户端连接,详见1.1
DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
}
1.1 DubboProtocol.getClients()
public class DubboProtocol extends AbstractProtocol {
private ExchangeClient[] getClients(URL url) {
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
// 如果url没有设置connections属性,则默认对当前provider_url创建一个共享的Connection
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// 继续交由getSharedClient()方法来处理,具体见1.2
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
}
1.2 DubboProtocol.getSharedClient()
public class DubboProtocol extends AbstractProtocol {
// key=host:port value=Exchanger,用来缓存已创建的Client
private final Map referenceClientMap = new ConcurrentHashMap();
private List getSharedClient(URL url, int connectNum) {
// 这里的key即provider的ip:port
String key = url.getAddress();
// 获取缓存对象,第一次创建时默认都为空
List clients = referenceClientMap.get(key);
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(clients)) {
// 在这里创建client,具体见1.3
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else {
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
locks.remove(key);
return clients;
}
}
}
1.3 DubboProtocol.buildReferenceCountExchangeClientList()
public class DubboProtocol extends AbstractProtocol {
private List buildReferenceCountExchangeClientList(URL url, int connectNum) {
List clients = new ArrayList();
for (int i = 0; i < connectNum; i++) {
// 继续交由buildReferenceCountExchangeClient处理
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
// 交由initClient()处理
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
private ExchangeClient initClient(URL url) {
// 设置客户端类型,默认为netty_client
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
...
ExchangeClient client;
try {
// 是否懒加载,默认为false
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 最终交由Exchangers来处理,具体见2、2.1
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}
总结:经过这一大段代码,DubboProtocol.refer()方法,会根据provider_url中的connections参数(默认为1,针对当前provider创建一个client,当前应用对该provider的操作都交由这一个client来处理),创建对应个数的Client,经过一系列的调用,最终交由Exchangers来创建。
2.Exchangers.connect()public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
...
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 故最终还是交由HeaderExchanger.connect()方法,具体见2.1
return getExchanger(url).connect(url, handler);
}
// 根据url获取对应类型的Exchanger,默认使用HeaderExchanger
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
之前有对Exchanger进行过分析,本质上还是SPI那一套,大家一定要非常清晰SPI的使用。
2.1 HeaderExchanger.connect()public class HeaderExchanger implements Exchanger {
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 与分析provider时类似,继续交由Transports来处理,具体见2.2
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
2.2 Transporters.connect()
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
// 创建ChannelHandler,根据输入的handlers个数不同,创建不同的Handler对象
// 本例中默认为DecodeHandler
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// Transporter默认为NettyTransporter,所以最终还是调用NettyTransporter.connect()进行处理,具体见2.3
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
2.3 NettyTransporter.connect()
public class NettyTransporter implements Transporter {
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
// 最终创建NettyClient,具体见3
return new NettyClient(url, handler);
}
}
与我们之前分析服务端的创建过程差不多,也是Exchanger --> Transporter --> Netty(NettyServer、NettyClient)
3.NettyClientpublic class NettyClient extends AbstractClient {
private Bootstrap bootstrap;
private volatile Channel channel;
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// 调用父类的构造处理
super(url, wrapChannelHandler(url, handler));
}
// 创建Netty Bootstrap
protected void doOpen() throws Throwable {
// 标准的创建Netty_client Bootstrap的过程
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
// 添加Handler处理器
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
// 创建对服务端的连接
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
...
}
} ...
} finally {
...
}
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
initExecutor(url);
try {
// 交由子类NettyClient来执行
doOpen();
} catch (Throwable t) {
...
}
try {
// connect.
connect();
...
}
} ...
}
// 创建线程池,同NettyServer时的创建过程,只不过client最终的线程池为CachedThreadPool
private void initExecutor(URL url) {
// 设置客户端线程名称,本例中 threadname=DubboClientHandler-xxx:20880
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
}
总结:
Dubbo协议消费者的创建过程还是比较简单的,虽然过程有点绕,但是都不算难。
本质上还是创建了NettyClient,默认在其构造方法中创建对服务端的长连接。
所以dubbo消费者在应用启动时都会默认创建一个对服务端的消费者(以ip:port为单位进行区分,N个不同的ip:port则会创建N个长连接)。
老规矩,还是用一个时序图描述下消费者客户端创建全过程。