在前一篇文章中介绍了Dubbo服务提供者的基本创建过程,同时分析了其基本配置信息。本文就来介绍下dubbo服务提供者暴露在本地模式下的经过。
1.ServiceConfig之scope之前的示例中,我们没有设置scope属性。scope属性默认有:local和remote,如果不设置的话,那么就是默认的none。
这个scope属性,决定了该服务提供者的服务范围。
local:只暴露在本地,不向外部提供服务,那么通过协议(dubbo等)过来的请求则无法请求到该服务;
remote:将服务暴露出去,消费者可以根据特定的协议请求到该服务;
2.ServiceConfig之PROTOCOL、PROXY_FACTORY参数分析// local方法的起点就是ServiceConfig.exportLocal()
public class ServiceConfig extends ServiceConfigBase {
// 两个重要属性
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
...
}
通过番外篇之Dubbo SPI的应用 分析中,我们知道,由于Protocol的几个实现中并没有指定具体的Adaptive实现类,所以会动态生成一个Protocol$Adaptive类(具体内容如下文2.2所示)
@SPI("dubbo")
public interface Protocol {
@Adaptive
Exporter export(Invoker invoker) throws RpcException;
@Adaptive
Invoker refer(Class type, URL url) throws RpcException;
...
}
org.apache.dubbo.rpc.Protocol具体内容如下所示(注意ProtocolFilterWrapper、ProtocolListenerWrapper是包装类):
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
同样,ProxyFactory的实现类中也没有指定默认的Adaptive实现类,故也会动态生成一个ProxyFactory$Adaptive类(具体内容如下文2.1所示)
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({PROXY_KEY})
T getProxy(Invoker invoker) throws RpcException;
@Adaptive({PROXY_KEY})
T getProxy(Invoker invoker, boolean generic) throws RpcException;
@Adaptive({PROXY_KEY})
Invoker getInvoker(T proxy, Class type, URL url) throws RpcException;
}
org.apache.dubbo.rpc.ProxyFactory文件具体内容如下(需要注意StubProxyFactoryWrapper是一个包装类):
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
如何查看动态生成的类呢?大家可以参考这篇博文https://blog.csdn.net/yfxhao123/article/details/109472166。
而javassist的相关内容可参考上一篇文章
动态生成的两个Adaptive类具体内容如下所示:
2.1 ProxyFactory$Adaptivepublic class ProxyFactory$Adaptive
implements ProxyFactory
{
public Invoker getInvoker(Object paramObject, Class paramClass, URL paramURL)
throws RpcException
{
if (paramURL == null) {
throw new IllegalArgumentException("url == null");
}
URL localURL = paramURL;
String str = localURL.getParameter("proxy", "javassist");
if (str == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + localURL.toString() + ") use keys([proxy])");
}
// 先获取ProxyFactory对象,默认是JavassistProxyFactory
ProxyFactory localProxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(str);
// 最终返回JavassistProxyFactory这个Invoker
return localProxyFactory.getInvoker(paramObject, paramClass, paramURL);
}
// getProxy暂时用不到,可以先不看
public Object getProxy(Invoker paramInvoker, boolean paramBoolean)
throws RpcException
{
if (paramInvoker == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
}
if (paramInvoker.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
}
URL localURL = paramInvoker.getUrl();
String str = localURL.getParameter("proxy", "javassist");
if (str == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + localURL.toString() + ") use keys([proxy])");
}
ProxyFactory localProxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(str);
return localProxyFactory.getProxy(paramInvoker, paramBoolean);
}
public Object getProxy(Invoker paramInvoker)
throws RpcException
{
if (paramInvoker == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
}
if (paramInvoker.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
}
URL localURL = paramInvoker.getUrl();
String str = localURL.getParameter("proxy", "javassist");
if (str == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + localURL.toString() + ") use keys([proxy])");
}
ProxyFactory localProxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(str);
return localProxyFactory.getProxy(paramInvoker);
}
}
可以看出,ProxyFactory.getInvoker()方法的调用链是StubProxyFactoryWrapper.getInvoker() --> JavassistProxyFactory.getInvoker()
2.2 Protocol$Adaptivepublic class Protocol$Adaptive implements Protocol {
public Exporter export(Invoker var1) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var2 = var1.getUrl();
String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
if (var3 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
} else {
// 获取Protocol相应实现类,在当前injvm协议下,默认为InjvmProtocol
Protocol var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);
return var4.export(var1);
}
}
}
// 暂时用不到,可以先不看
public Invoker refer(Class var1, URL var2) throws RpcException {
if (var2 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
} else {
Protocol var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4);
return var5.refer(var1, var2);
}
}
}
public Protocol$Adaptive() {
}
public void destroy() {
throw new UnsupportedOperationException("method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}
可以看到,Protocol.export()的调用顺序为:Protocol$Adaptive --> ProtocolListenerWrapper --> ProtocolFilterWrapper --> InjvmProtocol
不太明白的可以参考Dubbo SPI的应用分析
3.ServiceConfig.exportLocal()分析public class ServiceConfig extends ServiceConfigBase {
private void exportLocal(URL url) {
// 将dubbo协议的url重新设置为injvm(也就是local模式)协议的URL
// 本例中URL内容即为:injvm://127.0.0.1/xw.demo.DemoService?anyhost=true&application=provider&bind.ip=192.168.xx.xx&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=xw.demo.DemoService&methods=sayHello&pid=21464&release=&revision=1.0.0&side=provider×tamp=1627282273007&version=1.0.0
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// 结合我们上面对PROTOCOL和PROXY_FACTORY的分析,我们一层层的来分析下
// PROXY_FACTORY.getInvoker()具体见3.1
// PROTOCOL.export() 具体见3.2
Exporter exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
}
3.1 PROXY_FACTORY.getInvoker()调用链分析
上面有过结论,整个调用链为StubProxyFactoryWrapper --> JavassistProxyFactory
3.1.1 StubProxyFactoryWrapper.getInvoker()
public class StubProxyFactoryWrapper implements ProxyFactory {
private final ProxyFactory proxyFactory;
public Invoker getInvoker(T proxy, Class type, URL url) throws RpcException {
// 没有任何多余操作,直接交由下一个ProxyFactory处理,也就是默认的JavassistProxyFactory
return proxyFactory.getInvoker(proxy, type, url);
}
}
3.1.2 JavassistProxyFactory.getInvoker()
public class JavassistProxyFactory extends AbstractProxyFactory {
...
@Override
public Invoker getInvoker(T proxy, Class type, URL url) {
// 先获取wrap类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 返回一个AbstractProxyInvoker
return new AbstractProxyInvoker(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
// Wrapper.getWrapper()
public abstract class Wrapper {
public static Wrapper getWrapper(Class c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
// 重点在makeWrapper中
return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key));
}
private static Wrapper makeWrapper(Class c) {
...
// 方法体过长,不多做展示,具体可参考源码
// 主要就是通过javassist进行动态生成wrap类
try {
Class wc = cc.toClass();
// setup static field.
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
}
在我们提供的示例中,具体可参考 Dubbo源码解析-Dubbo服务提供者_Injvm协议(一)_恐龙弟旺仔的博客-CSDN博客
动态生成的Wrapper类内容如下:
class Wrapper1
extends Wrapper
implements ClassGenerator.DC
{
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public static Class[] mts1;
public String[] getMethodNames()
{
return mns;
}
public Class getPropertyType(String paramString)
{
return (Class)pts.get(paramString);
}
public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
throws InvocationTargetException
{
DemoServiceImpl localDemoServiceImpl;
try
{
localDemoServiceImpl = (DemoServiceImpl)paramObject;
}
catch (Throwable localThrowable1)
{
throw new IllegalArgumentException(localThrowable1);
}
try
{
if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
}
if ((!"sayHelloAsync".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHelloAsync((String)paramArrayOfObject[0]);
}
}
catch (Throwable localThrowable2)
{
throw new InvocationTargetException(localThrowable2);
}
throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
public String[] getDeclaredMethodNames()
{
return dmns;
}
public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2)
{
try
{
DemoServiceImpl localDemoServiceImpl = (DemoServiceImpl)paramObject1;
}
catch (Throwable localThrowable)
{
throw new IllegalArgumentException(localThrowable);
}
throw new NoSuchPropertyException("Not found property \"" + paramString + "\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
public Object getPropertyValue(Object paramObject, String paramString)
{
try
{
DemoServiceImpl localDemoServiceImpl = (DemoServiceImpl)paramObject;
}
catch (Throwable localThrowable)
{
throw new IllegalArgumentException(localThrowable);
}
throw new NoSuchPropertyException("Not found property \"" + paramString + "\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
public String[] getPropertyNames()
{
return pns;
}
public boolean hasProperty(String paramString)
{
return pts.containsKey(paramString);
}
}
故整个JavassistProxyFactory.getInvoker()本质上返回了AbstractProxyInvoker,其invoke方法本质上时调用了Wrapper.invokeMethod()方法
3.2 PROTOCOL.export()调用链分析依据上面的调用链分析,整个过程是:Protocol$Adaptive --> ProtocolFilterWrapper --> ProtocolListenerWrapper --> InjvmProtocol
3.2.1 Protocol$Adaptive.export()方法
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
// 直接调用下一个ProtocolFilterWrapper
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
3.2.2 ProtocolFilterWrapper.export()
public class ProtocolFilterWrapper implements Protocol {
public Exporter export(Invoker invoker) throws RpcException {
// 判断当前Protocol类型是否为registry
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// buildInvokerChain()是重点方法
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
Invoker last = invoker;
// 获取所有的Filter
List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker next = last;
last = new Invoker() {
@Override
public Class getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
// 依次调用Filter.invoke实现
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
Filter不属于本文重点,大家有一个印象即可,后续我们会专门分析,我们只需要知道在对外提供服务时,会经过Filter的处理即可3.2.3 ProtocolListenerWrapper.export()
public class ProtocolListenerWrapper implements Protocol {
public Exporter export(Invoker invoker) throws RpcException {
// 判断当前Protocol类型是否为registry
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 创建对Export对象的监听
return new ListenerExporterWrapper(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
public ListenerExporterWrapper(Exporter exporter, List listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
// 重点在这里,当调用具体的协议类型将服务提供后,对生成的Export对象进行监听
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
ExporterListener不是我们本次分析的重点,后续我们会专门来分析。我们在这里只需要记录有Listener即可。
3.2.4 InjvmProtocol.export()
public class InjvmProtocol extends AbstractProtocol implements Protocol {
public Exporter export(Invoker invoker) throws RpcException {
// 生成InjvmExporter对象
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
}
// InjvmExporter
class InjvmExporter extends AbstractExporter {
private final String key;
// 这个是个很重要的map,同一类型的协议的所有Export服务都存在于这个map中
private final Map> exporterMap;
}
总结:
最核心的一句代码:
Exporter exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
将整个服务暴露过程分为两步:
1.PROXY_FACTORY.getInvoker() 将服务转变为org.apache.dubbo.rpc.Invoker,这里的Invoker即如下生成的AbstractProxyInvoker
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public Invoker getInvoker(T proxy, Class type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 这里即是Invoker实例
return new AbstractProxyInvoker(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
2.PROTOCOL.export() 将Invoker转变为Exporter,将服务暴露出去。本例中的Exporter即为InjvmExporter。
最后我们用一张时序图来展示下整个调用过程: