Filter的功能我们之前一直都是一笔带过的,在本文中,我们就好好说明下Filter的创建以及其功能。
其实Filter在各个中间件框架中基本都存在的,主要作为拦截层,在真正执行操作之前,先执行Filter中的功能,以实现特定功能的扩展。
1.Filter链的创建在provider创建时,交由Protocol$Adaptive.export()方法暴露服务时,便会执行ProtocolFilterWrapper.export()方法(这块是SPI相关的知识点)。Filter链的创建就是在ProtocolFilterWrapper完成的。
public class ProtocolFilterWrapper implements Protocol {
private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
Invoker last = invoker;
// 获取Filter列表
List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker next = last;
// Invoker中封装当前filter对象
last = new Invoker() {
@Override
public Class getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 在这里调用filter
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
最终形成一个Invoker链,每个Invoker封装一个Filter对象,最终形成一个Filter链,如下图所示:
这样通过一级一级的调用,完成Filter链的调用。
下面来了解下Filter的基本功能
2.Filter的实现类Filter分为Provider端和Consumer端,实现类分别如下:
Provider:
Consumer:
下面我们分别介绍下几个重点的Filter
2.1 ConsumerContextFilter与ContextFilterDubbo实现参数隐式传递就是通过这一对Filter来实现的。具体的分析,最后会有博客来专门说明。
ConsumerContextFilter的作用笔者觉得不太大,跟封装参数发送到provider关系不太大。
2.2 ActiveLimitFilter主要用来限制消费者对一个服务端方法的并发调用量
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取url的actives参数,默认为0,不开启
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (!RpcStatus.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (rpcStatus) {
// 交由beginCount方法来控制方法并发,并发量计算也比较简单,如下方法所示:2.2.1
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
// 超过并发量,则wait一段时间
rpcStatus.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
// directly throw if it's checked exception
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// directly throw if the exception appears in the signature
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class[] exceptionClassses = method.getExceptionTypes();
for (Class exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {
return;
}
// otherwise, wrap with RuntimeException and throw back to the client
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
}
}
}
}
根据代码可知,Dubbo provider针对异常进行了封装处理,根据不同的异常类型做不同处理,总结如下:
如果是checked异常则直接抛出
如果是unchecked异常但是在接口上有声明,也会直接抛出
如果异常类和接口类在同一jar包里,直接抛出
如果是JDK自带的异常,直接抛出
如果是Dubbo的异常,直接抛出
其余的都包装成RuntimeException然后抛出
还有很多其他Filter,笔者不再一一说明,读者可自行阅读尝试使用。