前两篇我们分析了Dubbo服务提供者,在创建时的基本配置属性,如ServiceConfig、ApplicationConfig、RegistryConfig等。基本了解了基于API方式来创建Dubbo服务提供者的套路。
同时第二篇我们分析了在injvm(本地)模式下,dubbo服务如何向外注册(本质上还是注册在本地的map中,就是InjvmProtocol.exporterMap中),等待同进程中的服务消费者来调用(这个很重要,笔者基于Dubbo-demo源码中测试代码进行测试时候,无论怎样,消费者都会报错,找不到服务提供者,就是因为消费者和提供者不在同一个进程内)。
下面我们就来看下基于Injvm模式下的服务消费者创建方式及源码分析。
1.Injvm模式下的服务消费者和创建者代码来自dubbo-2.7.7中dubbo-demo-api项目下的Application,笔者有所精简(最主要的是将服务的发布和消费放到同一个main方法中了)
public class Application { // 服务提供者代码有所精简,本质上还是与之前的示例一样 public static void main(String[] args) throws Exception { startWithExport(); runWithRefer(); } // 服务提供者 private static void startWithExport() throws InterruptedException { ServiceConfigservice = new ServiceConfig<>(); service.setInterface(DemoService.class); service.setRef(new DemoServiceImpl()); service.setApplication(new ApplicationConfig("dubbo-demo-api-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); // 只暴露在本地,当前进程内 service.setScope("local"); service.export(); System.out.println("dubbo service started"); // new CountDownLatch(1).await(); } // 服务消费者 private static void runWithRefer() { ReferenceConfigreference = new ReferenceConfig<>(); reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer")); reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); reference.setInterface(DemoService.class); // 寻找本地模式的服务提供者 reference.setScope("local"); DemoService service = reference.get(); String message = service.sayHello("dubbo"); System.out.println(message); } }
有关于ApplicationConfig、RegistryConfig等之前都已经有过介绍,不再赘述。我们主要来看下ReferenceConfig
1.1 ReferenceConfigpublic class ReferenceConfigextends ReferenceConfigBase{ // 在 Dubbo服务提供(Injvm)中我们有过分析,这里返回的就是Protocol$Adaptive, // 调用链是Protocol$Adaptive --> ProtocolFilterWrapper --> ProtocolListenerWrapper --> InjvmProtocol private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); // 这个暂时用不上,可以先忽略 private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); // 同理,这里返回的是ProxyFactory$Adaptive,调用链是StubProxyFactoryWrapper --> JavassistProxyFactory private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 注册的接口类 private transient volatile T ref; // ReferenceConfig主要属性就是以上 ... } // ReferenceConfigBase public abstract class ReferenceConfigBaseextends AbstractReferenceConfig { // 接口名 protected String interfaceName; //接口类信息 protected Class interfaceClass; /** * client type TODO */ protected String client; // 用于点对点模式下指定调用的url protected String url; // 消费者配置信息(指定线程数等信息) protected ConsumerConfig consumer; // 指定协议 protected String protocol; protected ServiceMetadata serviceMetadata; }
主要属性就是上述这些,没有什么特别的东西。
有关于Protocol$Adaptive、ProxyFactory$Adaptive的内容可参考 Dubbo源码解析-Dubbo服务提供者_Injvm协议(二)_恐龙弟旺仔的博客-CSDN博客
2.Injvm消费者源码分析 2.1 ReferenceConfig.get()public class ReferenceConfigextends ReferenceConfigBase{ public synchronized T get() { if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } // 交由init()方法处理 if (ref == null) { init(); } return ref; } public synchronized void init() { // 初始化过的则不再初始化 if (initialized) { return; } ... // 这里主要用于设置属性 serviceMetadata checkAndUpdateSubConfigs(); ... Map map = new HashMap(); map.put(SIDE_KEY, CONSUMER_SIDE); ReferenceConfigBase.appendRuntimeParameters(map); ... // 一系列的配置参数,不是重点,直接忽略 map.put(INTERFACE_KEY, interfaceName); AbstractConfig.appendParameters(map, getMetrics()); ... serviceMetadata.getAttachments().putAll(map); // 这里是重点 ref = createProxy(map); serviceMetadata.setTarget(ref); serviceMetadata.addAttribute(PROXY_CLASS_REF, ref); ... } }
这里主要是解析ApplicationConfig等配置的参数到map中,最终添加到ServiceConfig中
// 在笔者当前的示例中,map的内容如下: "side" -> "consumer" "application" -> "dubbo-demo-api-consumer" "register.ip" -> "xxx.xxx" "release" -> "" "methods" -> "sayHello,sayHelloAsync" "scope" -> "local" "sticky" -> "false" "dubbo" -> "2.0.2" "pid" -> "9612" "interface" -> "org.apache.dubbo.demo.DemoService" "timestamp" -> "1628079039393"2.2 ReferenceConfig.createProxy()
public class ReferenceConfigextends ReferenceConfigBase{ private T createProxy(Map map) { // 这里判断是否injvm模式,本例中时 是 if (shouldJvmRefer(map)) { // 本例中生成的url 内容为:injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=9612®ister.ip=172.20.89.171&release=&scope=local&side=consumer&sticky=false×tamp=1628079039393 URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); // 这里通过代理对象生成invoker,通过之前的分析我们知道,最终会调用InjvmProtocol.refer()方法,具体分析见2.2.1 invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 非本地模式的不是本文重点,先忽略 } // 如果当前没有设置check=false,并且Invoker没有找到合适的服务提供者(怎么找呢,见2.2.2) if (shouldCheck() && !invoker.isAvailable()) { invoker.destroy(); throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } ... String metadata = map.get(METADATA_KEY); WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata); if (metadataService != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataService.publishServiceDefinition(consumerURL); } // 重要的又来了,我们接着看 return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); } }
2.2.1 Protocol$Adaptive.refer()
调用链依旧是 Protocol$Adaptive --> ProtocolFilterWrapper --> ProtocolListenerWrapper --> InjvmProtocol
我们直接看InjvmProtocol.refer()方法
public abstract class AbstractProtocol implements Protocol { publicInvokerrefer(Classtype, URL url) throws RpcException { // 最终由InjvmProtocol.protocolBindingRefer实现 return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); } } // InjvmProtocol.protocolBindingRefer public class InjvmProtocol extends AbstractProtocol implements Protocol { publicInvokerprotocolBindingRefer(ClassserviceType, URL url) throws RpcException { // 这个exporterMap很重要,就是之前我们injvm模式下,服务提供者将当前服务注册的地方,在当前Invoker中,直接把这个map当做构造参数传入 return new InjvmInvoker(serviceType, url, url.getServiceKey(), exporterMap); } } class InjvmInvokerextends AbstractInvoker{ private final Map关注打赏