在介绍完Dubbo 本地模式(Injvm协议)下的服务提供与消费后,上文我们又介绍了Dubbo远程模式(dubbo协议)下的服务暴露过程,本质上就是通过Netty将dubbo协议端口暴露出去,然后将provider_url添加到对应的注册中心去。
在dubbo服务暴露出去之后,dubbo协议的消费者是怎么从注册中心获取到服务提供者的地址?又是怎么创建连接发起调用的呢?本文我们就一起来看下。
按照之前关于Injvm模式下的服务暴露和消费,Dubbo协议的消费者理论上也会最终生成一个关于DubboInvoker的Proxy。
1.ReferenceConfig.get()有了之前分析Injvm协议下消费者的经验,具体见 Dubbo源码解析-Dubbo服务消费者_Injvm协议(一)
需要注意的是:之前服务消费者中的代码需要修改下,如下所示:
// 这一句是只消费本地暴露的服务
reference.setScope("local");
// 需要修改成
reference.setScope("remote");
我们略过重复的代码,直接进入ReferenceConfig.createProxy()方法
public class ReferenceConfig extends ReferenceConfigBase {
private T createProxy(Map map) {
// Injvm模式下的调用,前面已经有过分析,直接忽略
if (shouldJvmRefer(map)) {
...
} else {
urls.clear();
// 点对点模式下消费者会直接输入服务提供者的url,本例中非点对点模式,直接忽略
if (url != null && url.length() > 0) {
...
} else {
// 这里才是本文分析的重点,非本地模式下的创建过程
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
// 检查注册中心信息,也就是本例中的zookeeper://127.0.0.1:2181
checkRegistry();
List us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 拼接注册中心url
// 在本例中信息为:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-consumer&dubbo=2.0.2&pid=13948&refer=application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=13948®ister.ip=xxx&scope=remote&side=consumer&sticky=false×tamp=1628983620825®istry=zookeeper×tamp=1628984451422
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config to your spring config.");
}
}
}
// 消费者也允许多注册中心获取,但是无论怎样,最终还是会选择一个进行调用
if (urls.size() == 1) {
// Protocol$Adaptive.refer()按照之前的分析,最终会根据url的头信息(registry),将具体请求交由RegistryProtocol调用,具体见1.1
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 多注册中心,选择最后一个注册中心的url
List>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else {
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
// invoker不可用时,销毁并抛出异常
if (shouldCheck() && !invoker.isAvailable()) {
invoker.destroy();
...
}
...
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
1.1 RegistryProtocol.refer() 注册中心调用
public class RegistryProtocol implements Protocol {
public Invoker refer(Class type, URL url) throws RpcException {
// 获取注册中心地址
url = getRegistryUrl(url);
// 通过RegistryFactory$Adaptive.getRegistry()获取对应的注册工厂,具体见2 、2.1
// 最终registry在本例中返回ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 分组,非本文重点,暂时略过
Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 交由doRefer执行,具体见1.2
return doRefer(cluster, registry, type, url);
}
}
1.2 RegistryProtocol.doRefer()
public class RegistryProtocol implements Protocol {
private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
RegistryDirectory directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map parameters = new HashMap(directory.getConsumerUrl().getParameters());
// 创建消费者URL
// 本例中为:consumer://xxx.xx.xx.x/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=6420&scope=remote&side=consumer&sticky=false×tamp=1628985216353
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
// 将消费者URL注册到注册中心去
registry.register(directory.getRegisteredConsumerUrl());
}
// 创建RouterChain,具体见3、3.1
directory.buildRouterChain(subscribeUrl);
// 订阅服务端URL变动,具体见4、4.1
directory.subscribe(toSubscribeUrl(subscribeUrl));
// 通过Cluster$Adaptive.join()来创建Invoker,具体见5、5.1
Invoker invoker = cluster.join(directory);
List listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper registryInvokerWrapper = new RegistryInvokerWrapper(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
}
2. RegistryFactory$Adaptive.getRegistry(url) 获取合适的注册工厂
通过HSDB来查看动态生成的RegistryFactory,具体内容如下:
package org.apache.dubbo.registry;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adaptive
implements RegistryFactory
{
public Registry getRegistry(URL paramURL)
{
if (paramURL == null) {
throw new IllegalArgumentException("url == null");
}
URL localURL = paramURL;
String str = localURL.getProtocol() == null ? "dubbo" : localURL.getProtocol();
if (str == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + localURL.toString() + ") use keys([protocol])");
}
// 同样的套路,注册中心为zookeeper,所以最终获取的是ZookeeperRegistryFactory
RegistryFactory localRegistryFactory = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(str);
return localRegistryFactory.getRegistry(paramURL);
}
}
实际所有的这种动态生成的类都是同样的套路,主要根据getExtension(str) 中的str来确定最终使用的类。本例中的注册中心url为zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService...,故最终是通过ZookeeperRegistryFactory来确定Registry的。
2.1 ZookeeperRegistryFactory.getRegistry() 获取对应Registry注册对象信息获取到Registry后,后续会将消费者url注册上去,就像provider_url注册其上是一样的操作
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
public Registry createRegistry(URL url) {
// 直接创建ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
public abstract class AbstractRegistryFactory implements RegistryFactory {
// Registry Collection Map
protected static final Map REGISTRIES = new HashMap();
// 父类中实现
@Override
public Registry getRegistry(URL url) {
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 最终通过createRegistry()来创建,交由子类处理
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
}
所以,当注册中心为Zookeeper时,消费者获取到的Registry对象为ZookeeperRegistry
3.RegistryDirectory.buildRouterChain(subscribeUrl) 获取Router信息public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
public void buildRouterChain(URL url) {
// 直接交由RouterChain.buildChain(url)处理
this.setRouterChain(RouterChain.buildChain(url));
}
}
3.1 RouterChain.buildChain(url)
public class RouterChain {
// full list of addresses from registry, classified by method name.
private List invokers = Collections.emptyList();
// containing all routers, reconstruct every time 'route://' urls change.
private volatile List routers = Collections.emptyList();
public static RouterChain buildChain(URL url) {
return new RouterChain(url);
}
// 重点在这个执行方法
private RouterChain(URL url) {
// 通过SPI获取对应的RouterFactory实现类
// 本例中返回4个对应的Factory,MockRouterFactory、TagRouterFactory、AppRouterFactory、ServiceRouterFactory
List extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, "router");
// 调用各自RouterFactory.getRouter()方法来获取对应的Router对象
// 分别对应于MockInvokersSelector、TagRouter、AppRouter、ServiceRouter
List routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
}
}
最终本例中RouterChain.buildChain()方法获取到4个Router对象,拼装到RouterAchain.routers属性中(分别为MockInvokersSelector、TagRouter、AppRouter、ServiceRouter)
4.RegistryDirectory.subscribe(url) 订阅服务端URL变动public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 重要的都在这一句
// 从上文可知,本例中的registry为ZookeeperRegistry,所以我们直接调用ZookeeperRegistry.subscribe()方法
registry.subscribe(url, this);
}
}
注意:这里调用registry时,将RegistryDirectory本身作为listener传入subscribe()方法,后续会回调到listener
4.1 ZookeeperRegistry.subscribe() 订阅服务端变更public class ZookeeperRegistry extends FailbackRegistry {
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// ANY_VALUE=*,通配所有interface,非本例中重点关注,直接忽略
if (ANY_VALUE.equals(url.getServiceInterface())) {
...
} else {
List urls = new ArrayList();
for (String path : toCategoriesPath(url)) {
// path信息即当前接口服务提供者在zk上的注册地址
// 本例中为:/dubbo/org.apache.dubbo.demo.DemoService/providers
ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap());
// 创建zk监听,主要监听方法在ZookeeperRegistry.notify()中
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
// 对该provider_path添加zk监听
List children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 获取到服务端地址后,即触发提醒操作,我们重点看针对provider_url的notify()操作,具体见4.2
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
4.2 ZookeeperRegistry.notify()
中间过程忽略,比较简单,最终方法在AbstractRegistry.notify()中
public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
public abstract class AbstractRegistry implements Registry {
protected void notify(URL url, NotifyListener listener, List urls) {
...
Map categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap());
for (Map.Entry entry : result.entrySet()) {
String category = entry.getKey();
List categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 针对每个url调用listener.notify()来处理
// 这里的listener是谁呢?我们回过头来看,就是当时RegistryDirectory对象
listener.notify(categoryList);
saveProperties(url);
}
}
}
可以回顾一下4中的内容,registry.subscribe(url, this);这里的this就是RegistryDirectory本身。
所以listener.notify()就是调用的RegistryDirectory.notify()
4.3 RegistryDirectory.notify() 回调notify方法public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
public synchronized void notify(List urls) {
Map categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
// configuration和router不是本文的重点,暂时忽略
List configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 最终获取到provider_url的地址
List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
...
// 4.3.1 分析
refreshOverrideAndInvoker(providerURLs);
}
}
4.3.1 RegistryDirectory.refreshOverrideAndInvoker()
public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
private void refreshOverrideAndInvoker(List urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
}
4.3.2 RegistryDirectory.refreshInvoker() 刷新Invoker
类似于Injvm调用下,创建InjvmInvoker,Dubbo协议下,也会创建DubboInvoker,就在该方法,很重要
private void refreshInvoker(List invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
// 针对empty协议的执行
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
// 不支持empty协议
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
...
// 关键方法在这里,这里会创建对应协议的Invoker(本例中为),详见4.3.3
Map newUrlInvokerMap = toInvokers(invokerUrls);
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
4.3.3 RegistryDirectory.toInvokers()
public class RegistryDirectory extends AbstractDirectory implements NotifyListener {
private Map toInvokers(List urls) {
Map newUrlInvokerMap = new HashMap();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set keys = new HashSet();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
...
Map localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
// 默认enabled为true,自动获取注册
if (enabled) {
// 最终通过Protocol$Adaptive.refer()来创建Invoker
invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);
}
} ...
}
keys.clear();
return newUrlInvokerMap;
}
}
4.3.4 Protocol$Adaptive.refer()
经过之前的分析,我们可以知道,这里会调用最终的相关协议Protocol来实现,本例中为dubbo协议,故最终会调用到DubboProtocol.refer()
public class DubboProtocol extends AbstractProtocol {
@Override
public Invoker refer(Class type, URL url) throws RpcException {
return new AsyncToSyncInvoker(protocolBindingRefer(type, url));
}
public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建一个DubboInvoker即可
// 在getClients()方法中会创建对Provider的连接
DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
}
总结:绕的有点远了。我们回到我们的出发点,也就是RegistryProtocol.doRefer()方法,在执行directory.subscribe(toSubscribeUrl(subscribeUrl));订阅操作时触发的这一系列操作。
通过这个subscribe方法,我们创建了对dubbo provider_url的变动监听;同时也创建了DubboInvoker,添加到RegistryDirectory.urlInvokerMap属性中。
至于getClients()创建远程连接这一块,我们单独放到下一篇文章中详细说明。
5.Cluster$Adaptive.join()Cluster$Adaptive也是通过动态生成的,具体内容如下:
package org.apache.dubbo.rpc.cluster;
import org.apache.dubbo.common.Node;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
public class Cluster$Adaptive
implements Cluster
{
public Invoker join(Directory paramDirectory)
throws RpcException
{
if (paramDirectory == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
}
if (paramDirectory.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
}
URL localURL = paramDirectory.getUrl();
String str = localURL.getParameter("cluster", "failover");
if (str == null) {
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" + localURL.toString() + ") use keys([cluster])");
}
// 默认使用failoverCluster
Cluster localCluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(str);
return localCluster.join(paramDirectory);
}
}
由源码可知,Cluster默认使用FailoverCluster,默认会调用FailoverCluster.join()方法,最终会返回一个MockClusterInvoker。
Cluster相关知识点不是本文重点,后续会着重分析。
总结:本文重点分析了dubbo协议下的消费者创建过程。最重要有两个:
1.获取DubboInvoker,并创建对provider的长连接
2.将consumer_url注册到配置中心
3.监听provider_url的变更
依旧是两个重点:将远端请求转换为Invoker;将Invoker转换为接口代理(Proxy)
有关于Cluster也是重点部分,后续着重分析。
还是通过一张时序图来总结下全过程: