之前我们分析了inbound outbound事件在ChannelPipeline中的流转过程,确定了inbound事件在ChannelHandler中的处理顺序为head -- > tail,而outbound事件在ChannelHandler中的处理顺序为tail --> head。
以上都是正常的流程,如果当ChannelHandler处理请求出现问题时,关于异常是如何在ChannelPipeline中流转的呢?本文就此问题来进行探讨。
1.AbstractChannelHandlerContext.bind() 的异常处理ChannelPipeline中的调用最终都调用到AbstractChannelHandlerContext中的具体方法。我们来跟踪几个方法来看下其异常处理的方式
// AbstractChannelHandlerContext.bind
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 执行invokeBind()方法
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
// 如果bind()异常,则执行notifyOutboundHandlerException方法,具体见1.1
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
}
1.1 AbstractChannelHandlerContext.notifyOutboundHandlerException()
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// 具体见下方
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}
}
// PromiseNotificationUtil.tryFailure
public final class PromiseNotificationUtil {
public static void tryFailure(Promise p, Throwable cause, InternalLogger logger) {
// 最终会调用到Promise.tryFailure()方法来将error信息 cause包装进去,具体见1.2
if (!p.tryFailure(cause) && logger != null) {
Throwable err = p.cause();
if (err == null) {
logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",
p, ThrowableUtil.stackTraceToString(err), cause);
}
}
}
}
1.2 Promise.tryFailure()
Promise默认实现类为DefaultPromise,我们来看下其处理方式
public class DefaultPromise extends AbstractFuture implements Promise {
@Override
public boolean tryFailure(Throwable cause) {
return setFailure0(cause);
}
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
// 最终调用在这里
private boolean setValue0(Object objResult) {
// 将result结果赋值为包装异常后的CauseHolder对象
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 如果当前Promise有Listener,则触发所有监听
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
}
我们可以通过一个示例来展示下FutureListener的使用,如下所示:在connect()方法返回的FuturePromise后直接添加addListener()即可。
当发生异常时,会进入到如下异常业务逻辑处理的流程。
ChannelFuture f = b.connect(HOST, PORT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.cause() != null) {
// 处理异常业务逻辑
} else {
// 处理正常业务逻辑
}
}
}).sync();
总结:目前通过bind()方法展示了一种ChannelPipeline对异常事件的处理方案,即将异常信息封装到FuturePromise中,并触发其所有listener,在listener中我们可以获取到异常信息,并进行对应逻辑处理。
2.AbstractChannelHandlerContext.write() 的异常处理abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private void write(Object msg, boolean flush, ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
// 在这里执行
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
// AbstractChannelHandlerContext.invokeFlush0()
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
// 异常在这里处理,具体见2.1
invokeExceptionCaught(t);
}
}
}
2.1 AbstractChannelHandlerContext.invokeExceptionCaught()
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
// 如果当前ChannelHandler可以处理,那么直接调用对应handler.exceptionCaught()方法进行处理
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
// 否则将异常传递下去,让下一个ChannelHandler来处理
fireExceptionCaught(cause);
}
}
}
这种异常处理方式与正常的业务处理类似,如果当前ChannelHandler.exceptionCaught()被重写,那么则直接执行当前exceptionCaught方法;否则则交由下一个ChannelHandler处理。
所以我们在创建ChannelPipeline时,可以创建一个专门处理异常的ChannelHandler,或者在具体的业务ChannelHandler中重写下exceptionCaught方法。
3.重写异常处理方法我们可以在一些ChannelHandler的实现类中查找到他们重写了exceptionCaught方法。
3.1 LoggingHandlerfinal class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHandler {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 直接日志记录
log(Event.EXCEPTION, cause.toString());
}
}
3.2 TailContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 直接调用DefaultChannelPipeline处理方法
onUnhandledInboundException(cause);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
protected void onUnhandledInboundException(Throwable cause) {
// 记录日志,如果cause是ReferenceCounted类型,则调用ReferenceCounted.release()释放
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
}
TailContext作为inbound事件的最后一个节点,最终会将异常记录下来。
总结:针对异常事件的处理,我们可以看到有两种方式进行处理
1.添加FuturePromise的listener,在listener中进行相关异常处理
2.重写ChannelHandler.exceptionCaught()方法,处理业务异常逻辑