在介绍完Protocol层之后,下面来到Exchanger层。
它在整个Dubbo架构中如下所示:
主要作用:作为信息交换层,封装请求响应模式,同步转为异步。
将Server包装为ExchangeServer;将client包装为ExchangeClient。
本文就来分析下这几个重点类。
1.ExchangeClient 客户端的创建先来看下其入口:DubboProtocol.initClient() 来创建ExchangeClient,最终交由HeaderExchanger来执行
1.1 HeaderExchanger.connect() 创建ExchangeClientpublic class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// HeaderExchangeClient 具体信息参考1.2
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
Transporters.connect() 返回NettyClient(默认Netty传输的情况下)
1.2 HeaderExchangeClient 的构造与属性public class HeaderExchangeClient implements ExchangeClient {
private final Client client;
private final ExchangeChannel channel;
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
private HeartbeatTimerTask heartBeatTimerTask;
private ReconnectTimerTask reconnectTimerTask;
// 构造方法
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
// 此时client为NettyClient
this.client = client;
// 封装channel
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
// 创建重连任务,具体见1.2.1
startReconnectTask(url);
// 创建心跳任务,具体见1.2.2
startHeartBeatTask(url);
}
}
}
1.2.1 ReconnectTask 客户端重连任务的创建
public class HeaderExchangeClient implements ExchangeClient {
private void startReconnectTask(URL url) {
if (shouldReconnect(url)) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
// 允许空闲时间,默认为心跳时间 * 3 = 180秒
int idleTimeout = getIdleTimeout(url);
// 心跳检查时间,默认为60秒,不允许小于1秒
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
// 创建重连任务
this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
}
}
// ReconnectTimerTask 重连任务
public class ReconnectTimerTask extends AbstractTimerTask {
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();
// 如果channel未连接,则直接执行重连
if (!channel.isConnected()) {
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// 长时间未收到pong响应,则判定连接异常,直接执行重连
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}
}
1.2.2 HeartBeatTask 心跳任务
public class HeaderExchangeClient implements ExchangeClient {
private void startHeartBeatTask(URL url) {
// 默认NettyClient会进行心跳检测,所以这里不再创建心跳任务
if (!client.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}
}
总结:HeaderExchangeClient 在构造时会考虑创建重连任务(ReconnectTask)、心跳任务(NettyClient默认不需要,因为NettyClient本身已经有心跳处理)。
在封装了NettyClient后,最终请求都交由HeaderExchangeChannel来处理,该类是请求同步转异步的关键类。下面就来学习下这个类是如何进行操作的。
1.3 HeaderExchangeChannelfinal class HeaderExchangeChannel implements ExchangeChannel {
// 这里的channel即NettyClient
private final Channel channel;
// 客户端发送请求默认使用该方法
public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 封装请求体
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 获取一个当前request的Future类
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 使用channel将request发送出去
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 将该channel返回给客户端
return future;
}
}
在HeaderExchangeChannel中,发送请求时,创建当前请求的Future类,最终将Future返回给客户端,以实现同步到异步的转换。
2.ExchangeServer 服务端的创建public class HeaderExchanger implements Exchanger {
...
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 交由HeaderExchangeServer创建
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
2.1 HeaderExchangeServer 的构造
public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final RemotingServer server;
private AtomicBoolean closed = new AtomicBoolean(false);
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
TimeUnit.SECONDS, TICKS_PER_WHEEL);
private CloseTimerTask closeTimerTask;
public HeaderExchangeServer(RemotingServer server) {
Assert.notNull(server, "server == null");
// 此时的server为NettyServer
this.server = server;
// 创建空闲检查,NettyServer中已经定义了IdleStateHandler,不再需要再创建空闲检查
startIdleCheckTask(getUrl());
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 先进行状态检查,如果已关闭则直接抛出异常
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message
+ ", cause: The server " + getLocalAddress() + " is closed!");
}
// 请求交由NettyServer执行
server.send(message, sent);
}
}
貌似HeaderExchangeServer没有HeaderExchangeClient那么多功能了,主要是异常的检查,最后还是将请求交由NettyServer来发送。
总结:作为信息转换层,主要用于对请求的转换封装。
本文重点就是HeaderExchangeClient,在这里主要实现了请求同步向异步的转换,还创建了重连任务。我们在日常开发中,可以学习下这种方式。