今天在看文章时,介绍了一个Netty_client被多线程调用发送消息出去,当服务端处理完成之后,将结果值返回给客户端,客户端无法区分是哪个线程的响应的问题的解决方案。
笔者之前在介绍Netty的代码时,其中关于Netty在Sentinel中的实战部分实际有介绍过这块,但是看代码的时候一直没有仔细想,今天看到这个问题,决定把这个问题再次详细描述下,然后把Sentinel的解决方案也再次说明下。
1.具体问题描述在使用netty_client时,一般使用channel.writeAndFlush()发送消息,方法调用之后,本次发送就结束了。就发送到接收响应的过程而言,这个过程是异步的。
这时就会有两个问题:
1)当channel发送消息完成后,怎么让当前线程暂停下,等待消息结果返回再向后执行?
2)如果有多个线程同时调用channel来发送消息,在netty_server处理完成消息后,将多条响应都发送到客户端,那么怎么确定哪个响应对应的是哪个线程呢?
我们期望实现是如下机制:
Thread A发送通过ChannelA发送出去的消息,最终还是ThreadA来接收响应;
Thread B发送通过ChannelA发送出去的消息,最终还是ThreadB来接收响应;
2.Sentinel的处理方案具体代码还是Sentinel-1.6.3
两个问题的处理方案都在NettyTransportClient中。我们逐个来看下
2.1 当前线程暂停,直到接收到响应public class NettyTransportClient implements ClusterTransportClient {
@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
...
try {
// 发送消息到服务端
channel.writeAndFlush(request);
// 这里离的ChannelPromise就是异步响应结果值,最终client会将接收到的响应存放到ChannelPromise中,通过get方法获取
ChannelPromise promise = channel.newPromise();
// 在这里,我们使promise等待ClusterClientConfigManager.getRequestTimeout()这么久,默认是20ms
if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
}
...
} finally {
TokenClientPromiseHolder.remove(xid);
}
}
}
通过调用promise.await(long timeoutMillis)方法,我们就可以实现暂停当前发送线程,等待响应结果值。
在这里我们只等待了一个固定时间值,实际我们可以等待更久(调用promise.await()方法或者不停的调用promise.get()一直到获取到结果为止),来实现同步的发送、获取响应值。
2.2 响应乱序解决本质上,Netty的发送和响应是异步的,那么这个异步的响应是怎么能关联到对应的请求上呢?
我们可以通过一个唯一的值来关联请求和响应。比如在请求对象中添加一个唯一ID,而响应的时候也将这个唯一ID加到响应对象中。channel在接收到响应时,就可以根据这个id找到对应的请求了。
我们来看下Sentinel的具体实现。
public class NettyTransportClient implements ClusterTransportClient {
private final AtomicInteger idGenerator = new AtomicInteger(0);
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
...
// 在这里生成唯一ID
int xid = getCurrentId();
try {
// 将xid放入到请求体中
request.setId(xid);
channel.writeAndFlush(request);
ChannelPromise promise = channel.newPromise();
// 这里很关键,将xid和promise的对应关系存放起来,方便后面接收响应时,通过xid找到对应的promise
// 具体在2.2.1中
TokenClientPromiseHolder.putPromise(xid, promise);
if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
}
// 在这里就直接通过xid来promise响应值,响应值已经异步添加到该promise中了
// 通过2.2.2我们来看下具体过程
SimpleEntry entry = TokenClientPromiseHolder.getEntry(xid);
if (entry == null || entry.getValue() == null) {
// Should not go through here.
throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
}
return entry.getValue();
} finally {
TokenClientPromiseHolder.remove(xid);
}
}
// 唯一ID本质上就是通过AtomicInteger来实现的
private int getCurrentId() {
if (idGenerator.get() > MAX_ID) {
idGenerator.set(0);
}
return idGenerator.incrementAndGet();
}
}
2.2.1 TokenClientPromiseHolder 存放xid和promise对应关系
public final class TokenClientPromiseHolder {
// 通过一个Map来存放该对应关系
private static final Map PROMISE_MAP = new ConcurrentHashMap();
public static void putPromise(int xid, ChannelPromise promise) {
PROMISE_MAP.put(xid, new SimpleEntry(promise, null));
}
public static SimpleEntry getEntry(int xid) {
return PROMISE_MAP.get(xid);
}
}
很简单,就是通过一个Map来存放对应关系即可。
那么当client接收到响应时,是怎么处理的呢?
2.2.2 TokenClientHandler异步处理响应
public class TokenClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ClusterResponse) {
ClusterResponse response = (ClusterResponse) msg;
if (response.getType() == ClusterConstants.MSG_TYPE_PING) {
handlePingResponse(ctx, response);
return;
}
// 直接获取到response中的id,然后调用completePromise完成,具体见下面
TokenClientPromiseHolder.completePromise(response.getId(), response);
}
}
}
public final class TokenClientPromiseHolder {
// 处理响应值
public static boolean completePromise(int xid, ClusterResponse response) {
if (!PROMISE_MAP.containsKey(xid)) {
return false;
}
SimpleEntry entry = PROMISE_MAP.get(xid);
if (entry != null) {
ChannelPromise promise = entry.getKey();
if (promise.isDone() || promise.isCancelled()) {
return false;
}
// 设置响应结果值,设置响应状态为success
entry.setValue(response);
promise.setSuccess();
return true;
}
return false;
}
}
这种处理本质上就是通过request和response中的唯一的xid来匹配的
总结:多看源码好处多多,尤其这种关注度比较高的开源框架,拆开细细学,总会有很多收获。