您当前的位置: 首页 >  pip

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析- ChannelPipeline对异常事件的处理

恐龙弟旺仔 发布时间:2021-12-21 12:23:33 ,浏览量:0

前言:

之前我们分析了inbound outbound事件在ChannelPipeline中的流转过程,确定了inbound事件在ChannelHandler中的处理顺序为head -- > tail,而outbound事件在ChannelHandler中的处理顺序为tail --> head。

以上都是正常的流程,如果当ChannelHandler处理请求出现问题时,关于异常是如何在ChannelPipeline中流转的呢?本文就此问题来进行探讨。

1.AbstractChannelHandlerContext.bind() 的异常处理

ChannelPipeline中的调用最终都调用到AbstractChannelHandlerContext中的具体方法。我们来跟踪几个方法来看下其异常处理的方式

// AbstractChannelHandlerContext.bind
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
	@Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(localAddress, "localAddress");
        if (isNotValidPromise(promise, false)) {
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 执行invokeBind()方法
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                // 如果bind()异常,则执行notifyOutboundHandlerException方法,具体见1.1
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }
}
1.1 AbstractChannelHandlerContext.notifyOutboundHandlerException()
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
	private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
        // 具体见下方
        PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
    }
}

// PromiseNotificationUtil.tryFailure
public final class PromiseNotificationUtil {
	public static void tryFailure(Promise p, Throwable cause, InternalLogger logger) {
        // 最终会调用到Promise.tryFailure()方法来将error信息 cause包装进去,具体见1.2
        if (!p.tryFailure(cause) && logger != null) {
            Throwable err = p.cause();
            if (err == null) {
                logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
            } else if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",
                        p, ThrowableUtil.stackTraceToString(err), cause);
            }
        }
    }
}
1.2 Promise.tryFailure()

Promise默认实现类为DefaultPromise,我们来看下其处理方式

public class DefaultPromise extends AbstractFuture implements Promise {
 	@Override
    public boolean tryFailure(Throwable cause) {
        return setFailure0(cause);
    }
	private boolean setFailure0(Throwable cause) {
        return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }

    // 最终调用在这里
    private boolean setValue0(Object objResult) {
        // 将result结果赋值为包装异常后的CauseHolder对象
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            // 如果当前Promise有Listener,则触发所有监听
            if (checkNotifyWaiters()) {
                notifyListeners();
            }
            return true;
        }
        return false;
    }
}

我们可以通过一个示例来展示下FutureListener的使用,如下所示:在connect()方法返回的FuturePromise后直接添加addListener()即可。

当发生异常时,会进入到如下异常业务逻辑处理的流程。

ChannelFuture f = b.connect(HOST, PORT).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.cause() != null) {
            // 处理异常业务逻辑
        } else {
            // 处理正常业务逻辑
        }
    }
}).sync();

总结:目前通过bind()方法展示了一种ChannelPipeline对异常事件的处理方案,即将异常信息封装到FuturePromise中,并触发其所有listener,在listener中我们可以获取到异常信息,并进行对应逻辑处理。

2.AbstractChannelHandlerContext.write() 的异常处理
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
	private void write(Object msg, boolean flush, ChannelPromise promise) {
        ...

        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                // 在这里执行
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    // AbstractChannelHandlerContext.invokeFlush0()
	private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            // 异常在这里处理,具体见2.1
            invokeExceptionCaught(t);
        }
    }
}
2.1 AbstractChannelHandlerContext.invokeExceptionCaught() 
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
	private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                // 如果当前ChannelHandler可以处理,那么直接调用对应handler.exceptionCaught()方法进行处理
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
            // 否则将异常传递下去,让下一个ChannelHandler来处理
            fireExceptionCaught(cause);
        }
    }
}

这种异常处理方式与正常的业务处理类似,如果当前ChannelHandler.exceptionCaught()被重写,那么则直接执行当前exceptionCaught方法;否则则交由下一个ChannelHandler处理。

所以我们在创建ChannelPipeline时,可以创建一个专门处理异常的ChannelHandler,或者在具体的业务ChannelHandler中重写下exceptionCaught方法。

3.重写异常处理方法

我们可以在一些ChannelHandler的实现类中查找到他们重写了exceptionCaught方法。

3.1 LoggingHandler
final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHandler {
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 直接日志记录
        log(Event.EXCEPTION, cause.toString());
    }
}
3.2 TailContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 直接调用DefaultChannelPipeline处理方法
        onUnhandledInboundException(cause);
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
	protected void onUnhandledInboundException(Throwable cause) {
        // 记录日志,如果cause是ReferenceCounted类型,则调用ReferenceCounted.release()释放
        try {
            logger.warn(
                    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                            "It usually means the last handler in the pipeline did not handle the exception.",
                    cause);
        } finally {
            ReferenceCountUtil.release(cause);
        }
    }
}

TailContext作为inbound事件的最后一个节点,最终会将异常记录下来。

总结:

针对异常事件的处理,我们可以看到有两种方式进行处理

1.添加FuturePromise的listener,在listener中进行相关异常处理

2.重写ChannelHandler.exceptionCaught()方法,处理业务异常逻辑

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0397s