您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty多线程下响应消息乱序问题解决方案

恐龙弟旺仔 发布时间:2022-02-15 20:28:10 ,浏览量:0

前言:

今天在看文章时,介绍了一个Netty_client被多线程调用发送消息出去,当服务端处理完成之后,将结果值返回给客户端,客户端无法区分是哪个线程的响应的问题的解决方案。

笔者之前在介绍Netty的代码时,其中关于Netty在Sentinel中的实战部分实际有介绍过这块,但是看代码的时候一直没有仔细想,今天看到这个问题,决定把这个问题再次详细描述下,然后把Sentinel的解决方案也再次说明下。

1.具体问题描述

在使用netty_client时,一般使用channel.writeAndFlush()发送消息,方法调用之后,本次发送就结束了。就发送到接收响应的过程而言,这个过程是异步的。

这时就会有两个问题:

1)当channel发送消息完成后,怎么让当前线程暂停下,等待消息结果返回再向后执行?

2)如果有多个线程同时调用channel来发送消息,在netty_server处理完成消息后,将多条响应都发送到客户端,那么怎么确定哪个响应对应的是哪个线程呢?

我们期望实现是如下机制:

 

Thread A发送通过ChannelA发送出去的消息,最终还是ThreadA来接收响应;

Thread B发送通过ChannelA发送出去的消息,最终还是ThreadB来接收响应;

2.Sentinel的处理方案

具体代码还是Sentinel-1.6.3

两个问题的处理方案都在NettyTransportClient中。我们逐个来看下

2.1 当前线程暂停,直到接收到响应
public class NettyTransportClient implements ClusterTransportClient {
 
    @Override
    public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
        ...
        try {

            // 发送消息到服务端
            channel.writeAndFlush(request);

            // 这里离的ChannelPromise就是异步响应结果值,最终client会将接收到的响应存放到ChannelPromise中,通过get方法获取
            ChannelPromise promise = channel.newPromise();

            // 在这里,我们使promise等待ClusterClientConfigManager.getRequestTimeout()这么久,默认是20ms
            if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
                throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
            }

           ...
        } finally {
            TokenClientPromiseHolder.remove(xid);
        }
    }
}

通过调用promise.await(long timeoutMillis)方法,我们就可以实现暂停当前发送线程,等待响应结果值。

在这里我们只等待了一个固定时间值,实际我们可以等待更久(调用promise.await()方法或者不停的调用promise.get()一直到获取到结果为止),来实现同步的发送、获取响应值。

2.2 响应乱序解决

本质上,Netty的发送和响应是异步的,那么这个异步的响应是怎么能关联到对应的请求上呢?

我们可以通过一个唯一的值来关联请求和响应。比如在请求对象中添加一个唯一ID,而响应的时候也将这个唯一ID加到响应对象中。channel在接收到响应时,就可以根据这个id找到对应的请求了。

我们来看下Sentinel的具体实现。

public class NettyTransportClient implements ClusterTransportClient {
 	private final AtomicInteger idGenerator = new AtomicInteger(0);
    
    public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
        ...
        // 在这里生成唯一ID    
        int xid = getCurrentId();
        try {
            // 将xid放入到请求体中
            request.setId(xid);

            channel.writeAndFlush(request);
            ChannelPromise promise = channel.newPromise();
            
            // 这里很关键,将xid和promise的对应关系存放起来,方便后面接收响应时,通过xid找到对应的promise
            // 具体在2.2.1中
            TokenClientPromiseHolder.putPromise(xid, promise);

            if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
                throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
            }

            // 在这里就直接通过xid来promise响应值,响应值已经异步添加到该promise中了
            // 通过2.2.2我们来看下具体过程
            SimpleEntry entry = TokenClientPromiseHolder.getEntry(xid);
            if (entry == null || entry.getValue() == null) {
                // Should not go through here.
                throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
            }
            return entry.getValue();
        } finally {
            TokenClientPromiseHolder.remove(xid);
        }
    }
    
    // 唯一ID本质上就是通过AtomicInteger来实现的
    private int getCurrentId() {
        if (idGenerator.get() > MAX_ID) {
            idGenerator.set(0);
        }
        return idGenerator.incrementAndGet();
    }
}

2.2.1 TokenClientPromiseHolder 存放xid和promise对应关系

public final class TokenClientPromiseHolder {

    // 通过一个Map来存放该对应关系
    private static final Map PROMISE_MAP = new ConcurrentHashMap();

    public static void putPromise(int xid, ChannelPromise promise) {
        PROMISE_MAP.put(xid, new SimpleEntry(promise, null));
    }

    public static SimpleEntry getEntry(int xid) {
        return PROMISE_MAP.get(xid);
    }
}

很简单,就是通过一个Map来存放对应关系即可。

那么当client接收到响应时,是怎么处理的呢?

2.2.2 TokenClientHandler异步处理响应

public class TokenClientHandler extends ChannelInboundHandlerAdapter {
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ClusterResponse) {
            ClusterResponse response = (ClusterResponse) msg;

            if (response.getType() == ClusterConstants.MSG_TYPE_PING) {
                handlePingResponse(ctx, response);
                return;
            }

            // 直接获取到response中的id,然后调用completePromise完成,具体见下面
            TokenClientPromiseHolder.completePromise(response.getId(), response);
        }
    }
}

public final class TokenClientPromiseHolder {
    // 处理响应值
	public static  boolean completePromise(int xid, ClusterResponse response) {
        if (!PROMISE_MAP.containsKey(xid)) {
            return false;
        }
        SimpleEntry entry = PROMISE_MAP.get(xid);
        if (entry != null) {
            ChannelPromise promise = entry.getKey();
            if (promise.isDone() || promise.isCancelled()) {
                return false;
            }
            // 设置响应结果值,设置响应状态为success
            entry.setValue(response);
            promise.setSuccess();
            return true;
        }
        return false;
    }
}

这种处理本质上就是通过request和response中的唯一的xid来匹配的

总结:

多看源码好处多多,尤其这种关注度比较高的开源框架,拆开细细学,总会有很多收获。

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0365s