在介绍完基于Dubbo协议的服务提供者,暴露服务的整个过程后,本文重点介绍下Exchange层是如何将Url服务暴露出去。
之前笔者有提示过,是通过Netty的方式启动本地端口来暴露服务的,那么我们通过这篇文章来具体看下暴露的整个过程。
1.Exchangers.bind(url,handler)接着上文的示例来说,我们分析到Exchangers.bind()方法后便没有继续后续的分析。
public class Exchangers {
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 此时的url还是我们的dubbo_provider url信息,
// dubbo:dubbo://ip:20880/org.apache.dubbo.demo.DemoService...
// Handler即为DubboProtocol.requestHandler
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 后续调用getExchanger()
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
// 默认type=header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
// 熟悉的套路,根据type=header,则默认会返回HeaderExchanger
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
默认返回HeaderExchanger,则后续会调用HeaderExchanger.bind()方法开启服务
1.1 HeaderExchanger.bind()public class HeaderExchanger implements Exchanger {
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 貌似HeaderExchanger只是一个抽象层,直接交由Transports来操作了,具体见1.2
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
1.2 Transporters.bind()
public class Transporters {
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
// 这里依旧返回Transport$Adaptive
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
Exchanger 交由 Transporter,Transporter接口的默认实现类为Netty,也就是交由NettyTransporter来执行了。
1.3 NettyTransporter.bind()public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
// 直接创建了NettyServer对象,我们在下面仔细的来分析下这个NettyServer
return new NettyServer(url, handler);
}
}
2.NettyServer暴露接口对应端口服务
public class NettyServer extends AbstractServer implements RemotingServer {
// 都是标准的Netty server端所需要的配置
private ServerBootstrap bootstrap;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// setThreadName方法即设置线程池名称,用户可以自定义,具体详见下面,默认值为DubboServerHandler-ip:20880
// 后续交由父类执行,也就是AbstractServer
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
}
public class ExecutorUtil {
public static URL setThreadName(URL url, String defaultName) {
String name = url.getParameter(THREAD_NAME_KEY, defaultName);
// 默认使用DubboServerHandler,拼接上192.168.142.1:20880即可
name = name + "-" + url.getAddress();
url = url.addParameter(THREAD_NAME_KEY, name);
return url;
}
}
2.1 AbstractServer抽象类的基本信息
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
ExecutorService executor;
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取server端需要绑定的ip和port信息
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 在这里调用open方法,交由子类执行
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 这里创建了一个线程池,具体分析见2.3
executor = executorRepository.createExecutorIfAbsent(url);
}
}
AbstractServer作为抽象类,根据URL获取到具体的ip和port,然后将启动服务方法交由子类执行,本例中就是NettyServer来执行doOpen()方法
2.2 NettyServer.doOpen()public class NettyServer extends AbstractServer implements RemotingServer {
protected void doOpen() throws Throwable {
// 这里都是我们很熟悉的操作了
bootstrap = new ServerBootstrap();
// bossGroup默认线程数为1
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
// workerGroup默认线程数为5
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
// 很重要的client 处理Handler,后续专门一篇文章来分析下
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
// 在这里配置好编解码处理器,空闲处理器IdleStateHandler
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
在NettyServer.doOpen()方法中,都是我们很熟悉的方法了,标准的创建一个Netty的ServerBootStrap的步骤。
关于NettyServerHandler、Encoder、Decoder的处理过程不是本文分析的重点,后续会专门提供文章来分析接收到consumer的请求后,服务端处理过程
2.3 executorRepository.createExecutorIfAbsent(url)线程池创建在刚才2.1中 调用完成doOpen()方法创建Netty服务器之后,最会有一个创建线程池的方法,我们来分析下
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {
// 这里默认就返回DefaultExecutorRepository
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
...
try {
doOpen();
} catch (Throwable t) {
...
}
// 所以,后续直接调用DefaultExecutorRepository.createExecutorIfAbsent()来创建线程池
executor = executorRepository.createExecutorIfAbsent(url);
}
}
// DefaultExecutorRepository.createExecutorIfAbsent()
public class DefaultExecutorRepository implements ExecutorRepository {
private ConcurrentMap data = new ConcurrentHashMap();
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
// 本例中为服务端
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap());
Integer portKey = url.getPort();
// 在这里创建Executor
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
return executor;
}
// 创建线程池
private ExecutorService createExecutor(URL url) {
// 根据ThreadPool的 @SPI注解,其默认创建FixedThreadPool,并调用getExecutor()方法返回具体的ExecutorService,具体见2.3.1
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
}
2.3.1 FixedThreadPool.getExecutor(url)
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程池名称为上面创建的DubboServerHandler-ip:20880
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
// 线程池固定线程数为DEFAULT_THREADS=200
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
// 队列长度为0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
// 创建的即为JDK中的ThreadPoolExecutor
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
(queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
// 具体的拒绝策略信息见2.3.2
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
我们来总结下Dubbo服务端创建的线程池信息:
以暴露端口为维度,每个端口创建的线程池默认线程数量为200(最大线程数量也为200),队列长度为0(即若线程池线程使用完成,直接调用拒绝策略执行)
2.3.2 AbortPolicyWithReport 拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
// 拒绝策略执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 报告线程池耗尽
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// 打印线程池信息
dumpJStack();
// 触发 ThreadPoolExhaustedEvent
dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException(msg);
}
}
总结:
开启对应port服务端的代码并不算难,就是包装的层次比较多,所以相对有点绕。
一路走过来,从Exchanger到Transport再到NettyServer,最终通过Netty开启了dubbo服务端协议(默认端口为20880)对应端口的服务。
还是通过一个时序图来总结下整个过程: