在学习完了Netty的基本知识后,我们来实战下。
笔者之前有使用过Sentinel的相关技术,其中client与server端的交互就是使用Netty来完成的。
那本文我们就来分析下NettyClient的使用。
笔者分析的Sentinel源码版本为1.7.0
1.定义客户端交互接口面向接口编程,而不是面向类编程。作为一个框架设计者,需要时刻谨记该原则。
Sentinel也是,在创建客户端连接时,首先提出一个接口。
public interface ClusterTransportClient {
/**
* Start the client.
*
* @throws Exception some error occurred (e.g. initialization failed)
*/
void start() throws Exception;
/**
* Stop the client.
*
* @throws Exception some error occurred (e.g. shutdown failed)
*/
void stop() throws Exception;
/**
* Send request to remote server and get response.
*
* @param request Sentinel cluster request
* @return response from remote server
* @throws Exception some error occurs
*/
ClusterResponse sendRequest(ClusterRequest request) throws Exception;
/**
* Check whether the client has been started and ready for sending requests.
*
* @return true if the client is ready to send requests, otherwise false
*/
boolean isReady();
}
分析下其主要方法
start() 用于启动一个client
stop() 用于关闭一个client
isReady() 用于判断client的状态,若已创建完成连接,则判定为可用态,可以用于发送请求
sendRequest() client发送请求
2.定义基于Netty的实现类面向接口而不是实现类。如果以后我们需要切换为别的client创建方式(比如Mina),则可以直接基于(Mina)来创建客户端连接。
下面来看下基于Netty的ClusterTransportClient接口实现
2.1 NettyTransportClient的参数public class NettyTransportClient implements ClusterTransportClient {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));
public static final int RECONNECT_DELAY_MS = 2000;
// 用于连接的host port
private final String host;
private final int port;
// 基于Netty创建完对Server连接后,返回的Channel,可用于后续的请求发送
private Channel channel;
// Netty自带的
private NioEventLoopGroup eventLoopGroup;
// 请求处理Handler
private TokenClientHandler clientHandler;
private final AtomicInteger idGenerator = new AtomicInteger(0);
// 防止多次创建的状态位
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
// 失败连接次数
private final AtomicInteger failConnectedTime = new AtomicInteger(0);
private final AtomicBoolean shouldRetry = new AtomicBoolean(true);
// 构造方法中主要就是提供server的host port信息
public NettyTransportClient(String host, int port) {
AssertUtil.assertNotBlank(host, "remote host cannot be blank");
AssertUtil.isTrue(port > 0, "port should be positive");
this.host = host;
this.port = port;
}
...
}
从当前的构造方法可以看出,host和port都是通过外部调用创建的。其他参数都是比较正常的Netty client创建中所使用到的。
下面来看下start()方法
2.2 NettyTransportClient.start() 启动客户端连接public class NettyTransportClient implements ClusterTransportClient {
public void start() throws Exception {
shouldRetry.set(true);
startInternal();
}
private void startInternal() {
connect(initClientBootstrap());
}
// 用于创建Bootstrap
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
// 配置底层TCP相关参数
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
// 设置请求执行相关Handler类
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
// connect方法用于真正创建远程连接
private void connect(Bootstrap b) {
// 这里的currentState状态位,避免多次创建连接
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
b.connect(host, port)
// 添加一个连接监听
.addListener(new GenericFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
// 连接失败,则打印日志,记录失败次数
RecordLog.warn(
String.format("[NettyTransportClient] Could not connect to after %d times",
host, port, failConnectedTime.get()), future.cause());
failConnectedTime.incrementAndGet();
channel = null;
} else {
failConnectedTime.set(0);
channel = future.channel();
RecordLog.info(
"[NettyTransportClient] Successfully connect to server ");
}
}
});
}
}
}
有关于currentState状态位的使用,我们可以学习下,通过该状态位的使用,可以避免创建多个连接。这样NettyTransportClient对外提供的Channel一直都是同一个。
按照上面的start()方式,那么当连接失败时,客户端就不再重试了嘛?为什么没有看到重试方法呢?
答案是有重试的,只不过不是在当前startInternal()方法,而是启动了一个定时任务,来不停的执行重试。
2.3 创建连接重试public class NettyTransportClient implements ClusterTransportClient {
private Runnable disconnectCallback = new Runnable() {
@Override
public void run() {
// start()方法调用之后,shouldRetry=true,所以会直接执行到下面的调度方法
if (!shouldRetry.get()) {
return;
}
// 通过指定频率来执行重连
SCHEDULER.schedule(new Runnable() {
@Override
public void run() {
if (shouldRetry.get()) {
RecordLog.info("[NettyTransportClient] Reconnecting to server ");
try {
startInternal();
} catch (Exception e) {
RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
}
}
}
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
cleanUp();
}
};
}
disconnectCallback确实实现了连接的重试,那么这个RUNNABLE是什么时候被触发的呢?
全文搜索下,可以看到在Bootstrap创建的时候添加进去的
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 在创建TokenClientHandler的时候被使用到了,那么具体是如何使用的呢?具体见TokenClientHandler
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
...
pipeline.addLast(clientHandler);
}
});
return b;
}
2.3.1 TokenClientHandler
public class TokenClientHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger currentState;
private final Runnable disconnectCallback;
public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallback) {
this.currentState = currentState;
this.disconnectCallback = disconnectCallback;
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + getRemoteAddress(ctx));
currentState.set(ClientConstants.CLIENT_STATUS_OFF);
// 在这里被调用到了
disconnectCallback.run();
}
}
从这一系列的骚操作,在channel断开时,也就是unregistered方法时,此时就需要进行重连了。
2.4 NettyTransportClient.stop() 关闭客户端连接public class NettyTransportClient implements ClusterTransportClient {
public void stop() throws Exception {
// Stop retrying for connection.
shouldRetry.set(false);
// 如果正在连接状态的,则sleep 200ms
while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
try {
Thread.sleep(200);
} catch (Exception ex) {
// Ignore.
}
}
// 调用如下
cleanUp();
failConnectedTime.set(0);
RecordLog.info("[NettyTransportClient] Cluster transport client stopped");
}
private void cleanUp() {
// 直接关闭channel
if (channel != null) {
channel.close();
channel = null;
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
关闭客户端就是常规的方式,多了一个状态的判断。
2.5 NettyTransportClient.sendRequest() 发送请求public class NettyTransportClient implements ClusterTransportClient {
@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
if (!isReady()) {
throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
}
if (!validRequest(request)) {
throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
}
// 获取一个唯一ID,标志当前这个唯一请求
int xid = getCurrentId();
try {
// request设置完id之后,直接调用channel进行发送
request.setId(xid);
channel.writeAndFlush(request);
// 发送请求和接收响应是异步的,通过Future来实现
ChannelPromise promise = channel.newPromise();
TokenClientPromiseHolder.putPromise(xid, promise);
// 在这里等待响应特定时间后,如果还没有获取到结果,则直接抛出异常
if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
}
// 这里通过TokenClientPromiseHolder来完成请求响应的异步封装,详细可参考2.5.1
SimpleEntry entry = TokenClientProseHolder.getEntry(xid);
if (entry == null || entry.getValue() == null) {
// Should not go through here.
throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
}
return entry.getValue();
} finally {
TokenClientPromiseHolder.remove(xid);
}
}
}
2.5.1 响应异步封装
笔者觉得这里还是比较重要的知识点,尤其是我们需要对响应进行异步接收的时候。
public final class TokenClientPromiseHolder {
// 该Map主要用于存放ID对应的channelPromise
private static final Map PROMISE_MAP = new ConcurrentHashMap();
// 请求发送结束后,则调用该方法,将id对应的promise存放到map中
public static void putPromise(int xid, ChannelPromise promise) {
PROMISE_MAP.put(xid, new SimpleEntry(promise, null));
}
public static SimpleEntry getEntry(int xid) {
return PROMISE_MAP.get(xid);
}
public static void remove(int xid) {
PROMISE_MAP.remove(xid);
}
// 当接收到响应时,则调用该方法
public static boolean completePromise(int xid, ClusterResponse response) {
if (!PROMISE_MAP.containsKey(xid)) {
return false;
}
// 根据请求id查找到对应的ChannelPromise
SimpleEntry entry = PROMISE_MAP.get(xid);
if (entry != null) {
ChannelPromise promise = entry.getKey();
if (promise.isDone() || promise.isCancelled()) {
return false;
}
// 设置promise状态为success,并将值设置到entry中
entry.setValue(response);
promise.setSuccess();
return true;
}
return false;
}
private TokenClientPromiseHolder() {
}
}
异步封装响应状态和响应内容,还是很重要的,那么这个completePromise()方法在何时被调用呢?
public class TokenClientHandler extends ChannelInboundHandlerAdapter {
// 读到server端的响应时
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ClusterResponse) {
ClusterResponse response = (ClusterResponse) msg;
if (response.getType() == ClusterConstants.MSG_TYPE_PING) {
handlePingResponse(ctx, response);
return;
}
// 接收到响应时,则调用该方法,server端也会将该id传回。
TokenClientPromiseHolder.completePromise(response.getId(), response);
}
}
}
总结:
从Sentienl创建客户端连接的过程中,我们还是能学到很多知识点的。
比如连接状态的判断,避免多次创建连接;
重试连接线程的执行,在channelUnregistered()时候调用;
每次请求都设置一个唯一的ID,这样可以异步接收响应;