前面两篇文章分别讲述了本地模式下的协议暴露(InjvmProtocol)和协议消费(InjvmInvoker)。实际到这里的话,协议暴露只讲述了一半,因为协议的暴露默认还会以DubboProtocol的模式暴露出去。本文就来了解下Dubbo如何向外暴露服务。
强烈建议读者可以先看下 Dubbo源码解析-Dubbo服务提供者_Injvm协议(二)_恐龙弟旺仔的博客-CSDN博客 这篇文章,对local模式的服务暴露有一个了解后(最主要是对PROXY_FACTORY和PROXY的分析),再看remote协议的暴露效果会更好。
具体示例代码见如下:
public class ProviderApplication {
public static void main(String[] args) {
// 服务实现(自定义DemoService接口)
DemoService demoService = new DemoServiceImpl();
// 当前应用配置
ApplicationConfig application = new ApplicationConfig();
application.setName("provider");
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
// 本地zookeeper作为配置中心
registry.setAddress("zookeeper://localhost:2181");
// 服务提供者协议配置
ProtocolConfig protocol = new ProtocolConfig();
// dubbo协议,并以20881端口暴露
protocol.setName("dubbo");
protocol.setPort(20881);
// 服务提供者暴露服务配置
ServiceConfig service = new ServiceConfig();
service.setApplication(application);
service.setRegistry(registry);
service.setProtocol(protocol);
service.setInterface(DemoService.class);
service.setRef(demoService);
service.setVersion("1.0.0");
// 暴露及注册服务
service.export();
}
}
// 接口
public interface DemoService {
String sayHello(String name);
}
1.ServiceConfig.doExportUrls()
我们的故事依然从这个方法开始,之前ServiceConfig.exportLocal()也是从本方法开始的。
public class ServiceConfig extends ServiceConfigBase {
private void doExportUrls() {
// 获取注册中心url信息,具体见1.1
// 从这里我们可以看出,Dubbo允许我们进行多注册中心多协议暴露
List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
// 根据当前协议将服务注册出去
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
// 默认为dubbo协议
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
...
// 拼装url
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 暴露在本地,之前的文章已经有过详细描述
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 本文分析的重点,暴露到远端的注册中心
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
...
// 添加dynamic=true属性,默认动态注册到注册中心,若为false的话,则需要用户手动开启后才可用
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
...
// 还是之前的套路,执行链为:StubProxyFactoryWrapper --> JavassistProxyFactory
// JavassistProxyFactory.getInvoker()本质上返回了AbstractProxyInvoker,其invoke方法本质上时调用了Wrapper.invokeMethod()方法
Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 又走回到这里了,我们继续分析,见2
Exporter exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 直连模式,不属于本文重点,先忽略
}
...
}
}
}
1.1 ConfigValidationUtils.loadRegistries()获取注册中心信息
public class ConfigValidationUtils {
public static List loadRegistries(AbstractInterfaceConfig interfaceConfig, boolean provider) {
// check && override if necessary
List registryList = new ArrayList();
ApplicationConfig application = interfaceConfig.getApplication();
// 获取我们之前配置的注册信息,即registry.setAddress("zookeeper://localhost:2181");
List registries = interfaceConfig.getRegistries();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
address = ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map map = new HashMap();
AbstractConfig.appendParameters(map, application);
AbstractConfig.appendParameters(map, config);
map.put(PATH_KEY, RegistryService.class.getName());
AbstractInterfaceConfig.appendRuntimeParameters(map);
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
List urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
// 拼装URL,以registry开头
// 本例中为registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=584®istry=zookeeper×tamp=1628684119163
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(extractRegistryType(url))
.build();
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
}
2.PROTOCOL.export(wrapperInvoker)分析
按照之前我们在 Dubbo服务提供(InJvm)文章中分析的PROTOCOL执行链路为Protocol$Adaptive --> ProtocolFilterWrapper --> ProtocolListenerWrapper -->xxx(暂时还不知道最终是什么)
那么本文中最终执行的Protocol是什么呢?根据Protocol$Adaptive的代码我们可以看到,最终会根据ProtocolName来选择,当前的ProtocolName=registry,所以最终会执行到RegistryProtocol
ProtocolFilterWrapper只是添加了一堆Filter,用来拦截请求,我们直接看后续的ProtocolListenerWrapper
2.1 ProtocolListenerWrapper.export()public class ProtocolListenerWrapper implements Protocol {
public Exporter export(Invoker invoker) throws RpcException {
// 这里的invoker.url即为我们上述1.1中封装的url
// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&pid=584®istry=zookeeper×tamp=1628684119163
if (UrlUtils.isRegistry(invoker.getUrl())) {
// 根据条件判断,会执行到这里,直接交由RegistryProtocol执行
return protocol.export(invoker);
}
return new ListenerExporterWrapper(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
}
public class UrlUtils {
public static boolean isRegistry(URL url) {
// 如果protocol==registry或者protocol==ervice-discovery-registry
// 该url明显符合protocol==registry
return REGISTRY_PROTOCOL.equals(url.getProtocol()) || SERVICE_REGISTRY_PROTOCOL.equalsIgnoreCase(url.getProtocol());
}
}
2.2 RegistryProtocol.export()
public class RegistryProtocol implements Protocol {
public Exporter export(final Invoker originInvoker) throws RpcException {
// 获取注册中心地址
// 本例中为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo://192.168.142.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.142.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=584&release=&scope=remote&side=provider×tamp=1628684537534&pid=584×tamp=1628684119163
URL registryUrl = getRegistryUrl(originInvoker);
// 获取注册服务协议地址,20880为dubbo协议默认端口号
// 本例中为dubbo://192.168.142.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.142.1&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=584&release=&scope=remote&side=provider×tamp=1628684537534
URL providerUrl = getProviderUrl(originInvoker);
// 使用OverrideListener订阅配置
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 这里很关键,在这里将服务以具体的协议暴露出去,具体见2.3
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
// 获取对应的注册中心对象,具体见2.2.1
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 将当前provider_url注册到Registry中,本例中为ZookeeperRegistry,具体见2.2.2
register(registryUrl, registeredProviderUrl);
}
...
// 添加registryUrl和subscribeUrl到Exporter中
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// 监听器响应,具体见2.2.3
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter(exporter);
}
}
2.2.1 RegistryProtocol.getRegistry(originInvoker)获取注册中心对象信息
public class RegistryProtocol implements Protocol {
protected Registry getRegistry(final Invoker originInvoker) {
// 本例中registryUrl为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?...
URL registryUrl = getRegistryUrl(originInvoker);
// 从registryFactory中获取对应Registry对象,根据zookeeper的名称,最终获取到ZookeeperRegistry
// 当前registryFactory依旧是动态生成的,RegistryFactory$Adaptive
return registryFactory.getRegistry(registryUrl);
}
// 根据注册信息url获取真正的注册地址,本例中注册到zookeeper上
protected URL getRegistryUrl(Invoker originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
}
2.2.2 RegistryProtocol.register() 将provider_url注册到注册中心去
调用链为ListenerRegistryWrapper --> ZookeeperRegistry,
public class ListenerRegistryWrapper implements Registry {
public void register(URL url) {
try {
// 直接交由ZookeeperRegistry处理
registry.register(url);
} finally {
// 当前主要作用在于此,添加监听器,用于监听
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
public void doRegister(URL url) {
try {
// 创建一个当前url的临时节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
总结:总体来说,以Zookeeper为注册中心时,就是将provider_url作为临时节点添加到zookeeper中
2.2.3 RegistryProtocol.notifyExport()
服务暴露成功后,则触发监听器的onExport()方法
public class RegistryProtocol implements Protocol {
private void notifyExport(ExporterChangeableWrapper exporter) {
// 一样的,根据ExtensionLoader来获取所有的监听器
List listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
if (CollectionUtils.isNotEmpty(listeners)) {
for (RegistryProtocolListener listener : listeners) {
// 逐个触发监听器行为
listener.onExport(this, exporter);
}
}
}
}
2.3 RegistryProtocol.doLocalExport()将服务以具体协议暴露出去
public class RegistryProtocol implements Protocol {
private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) {
// 这来的key就是具体dubbo协议的url
// dubbo://192.168.142.1:20880/org.apache.dubbo.demo.DemoService...
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper) bounds.computeIfAbsent(key, s -> {
Invoker invokerDelegate = new InvokerDelegate(originInvoker, providerUrl);
// 在这里重新调用了Protocol$Adaptive.export()方法将服务暴露出去
return new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegate), originInvoker);
});
}
}
针对Url为dubbo://192.168.142.1:20880/org.apache.dubbo.demo.DemoService...,
此时的Protocol调用链为:Protocol$Adaptive --> ProtocolFilterWrapper --> ProtocolListenerWrapper --> DubboProtocol
中间的调用过程我们都已经有过分析,直接看最后的DubboProtocol调用过程
2.4 DubboProtocol.export()public class DubboProtocol extends AbstractProtocol {
public Exporter export(Invoker invoker) throws RpcException {
URL url = invoker.getUrl();
// 获取key信息
// 本例中为:org.apache.dubbo.demo.DemoService:20880
String key = serviceKey(url);
// 创建DubboExporter,并添加到当前协议的exporterMap中,与InjvmProtocol类似
DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...
// 创建服务,将该端口暴露出去
openServer(url);
optimizeSerialization(url);
return exporter;
}
// openServer()
private void openServer(URL url) {
// 作为服务端,在本地某端口启动一个服务
// 本例中key为 192.168.142.1:20880,后续会在当前20880端口启动一个服务端
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
// 如果服务端已经创建,则不再重复创建
server = serverMap.get(key);
if (server == null) {
// createServer进行服务创建
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
// 创建服务端
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 这里获取服务暴露方式,默认以Netty方式进行暴露
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
ExchangeServer server;
try {
// 真正的处理在这里,不是本文重点,后续会着重讲解
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 当前主要是服务暴露,所以非client,直接忽略这段代码即可
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
}
可以看到,DubboProtocol的服务暴露方式,就是获取当前协议指定的端口号,然后使用Netty在该端口暴露一个服务端。
有关于Exchangers的相关代码我们后续会专门说明,这里只需要先知道就可以了。
总结:Dubbo协议的暴露相较于Injvm协议的暴露多了些东西,概括来说是两样:
* 本地启动对应协议端口服务(默认以Netty方式),以支持消费者的连接
* 将provider_url注册到注册中心去,以支持消费者探寻
最后,用一张时序图来描述下整个过程: