public class NettyServer extends AbstractServer implements RemotingServer {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 具体创建在ChannelHandlers.wrap()中
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
1.1 ChannelHandlers.wrap()
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 本质上最终还是调用到这
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
从NettyServer的构造方法中可以看到,传入的Handler通过不断的复合,最终以MultiMessageHandler --> HeartbeatHandler --> (Dispatcher生成的Handler) 这种顺序来处理请求。
2.1 AllDispatcher(默认)当前类型Dispatcher,则会把接收到的所有请求(请求、响应、连接心跳等事件)都交由自定义的线程池来处理。
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 在接收到请求时,直接交由executor线程池处理
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
2.2 DirectDispatcher所有的请求都直接在work线程上执行,不交由自定义线程池来处理。
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new DirectChannelHandler(handler, url);
public class DirectChannelHandler extends WrappedChannelHandler {
public DirectChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
if (executor instanceof ThreadlessExecutor) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
// 直接在当前线程执行handler操作
} else {
handler.received(channel, message);
2.3 MessageOnlyDispatcher
public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
2.4 ExecutionDispatcher只有请求类型消息交由自定义线程池执行,其他类型消息都在work线程上执行
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
// 这里会过滤消息类型,只有request请求才会交由自定义线程池执行
if (message instanceof Request) {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
} else if (executor instanceof ThreadlessExecutor) {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
handler.received(channel, message);
2.5 ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ConnectionOrderedChannelHandler(handler, url);
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
// 定义一个单线程的线程池,后续为执行连接、断开事件做准备
connectionExecutor = new ThreadPoolExecutor(1, 1,
new LinkedBlockingQueue(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
public void connected(Channel channel) throws RemotingException {
try {
// 连接事件被单线程池执行
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
public void disconnected(Channel channel) throws RemotingException {
try {
// 连接事件被单线程池执行
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 请求响应信息则交由自定义线程池执行
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));