您当前的位置: 首页 >  dubbo

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Dubbo源码协议-Filter的使用

恐龙弟旺仔 发布时间:2021-12-12 19:58:08 ,浏览量:0

前言:

Filter的功能我们之前一直都是一笔带过的,在本文中,我们就好好说明下Filter的创建以及其功能。

其实Filter在各个中间件框架中基本都存在的,主要作为拦截层,在真正执行操作之前,先执行Filter中的功能,以实现特定功能的扩展。

1.Filter链的创建

在provider创建时,交由Protocol$Adaptive.export()方法暴露服务时,便会执行ProtocolFilterWrapper.export()方法(这块是SPI相关的知识点)。Filter链的创建就是在ProtocolFilterWrapper完成的。

public class ProtocolFilterWrapper implements Protocol {
	private static  Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
        Invoker last = invoker;
        // 获取Filter列表
        List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker next = last;
                // Invoker中封装当前filter对象
                last = new Invoker() {

                    @Override
                    public Class getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // 在这里调用filter
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                try {
                                    Filter.Listener listener = listenableFilter.listener(invocation);
                                    if (listener != null) {
                                        listener.onError(e, invoker, invocation);
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                listener.onError(e, invoker, invocation);
                            }
                            throw e;
                        } finally {

                        }
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                Filter.Listener listener = listenableFilter.listener(invocation);
                                try {
                                    if (listener != null) {
                                        if (t == null) {
                                            listener.onResponse(r, invoker, invocation);
                                        } else {
                                            listener.onError(t, invoker, invocation);
                                        }
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                if (t == null) {
                                    listener.onResponse(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            }
                        });
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return last;
    }
}

最终形成一个Invoker链,每个Invoker封装一个Filter对象,最终形成一个Filter链,如下图所示:

这样通过一级一级的调用,完成Filter链的调用。

下面来了解下Filter的基本功能

2.Filter的实现类

Filter分为Provider端和Consumer端,实现类分别如下:

Provider:

Consumer:

下面我们分别介绍下几个重点的Filter

2.1 ConsumerContextFilter与ContextFilter

Dubbo实现参数隐式传递就是通过这一对Filter来实现的。具体的分析,最后会有博客来专门说明。

ConsumerContextFilter的作用笔者觉得不太大,跟封装参数发送到provider关系不太大。

2.2 ActiveLimitFilter

主要用来限制消费者对一个服务端方法的并发调用量

@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {
 
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //  获取url的actives参数,默认为0,不开启
        int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
        final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (!RpcStatus.beginCount(url, methodName, max)) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            synchronized (rpcStatus) {
                // 交由beginCount方法来控制方法并发,并发量计算也比较简单,如下方法所示:2.2.1
                while (!RpcStatus.beginCount(url, methodName, max)) {
                    try {
                        // 超过并发量,则wait一段时间
                        rpcStatus.wait(remain);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    if (remain  invoker, Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) {
        if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                Throwable exception = appResponse.getException();

                // directly throw if it's checked exception
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return;
                }
                // directly throw if the exception appears in the signature
                try {
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class[] exceptionClassses = method.getExceptionTypes();
                    for (Class exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return;
                }

                // for the exception not found in method's signature, print ERROR message in server's log.
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return;
                }
                // directly throw if it's JDK exception
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return;
                }
                // directly throw if it's dubbo exception
                if (exception instanceof RpcException) {
                    return;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
            }
        }
    }
}

根据代码可知,Dubbo provider针对异常进行了封装处理,根据不同的异常类型做不同处理,总结如下:

如果是checked异常则直接抛出
如果是unchecked异常但是在接口上有声明,也会直接抛出
如果异常类和接口类在同一jar包里,直接抛出
如果是JDK自带的异常,直接抛出
如果是Dubbo的异常,直接抛出
其余的都包装成RuntimeException然后抛出

还有很多其他Filter,笔者不再一一说明,读者可自行阅读尝试使用。

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0407s