紧接着前一篇文章。上文分析了消费者发送请求的全过程,最终通过NettyChannel将请求发送到Provider,当然中间经过了一系列的处理。
本文就从Provider的视角来看下,当服务提供者接收到了请求之后,是如何处理这个请求的。
1.代码示例Provider示例如前文
public class ProviderApplication {
public static void main(String[] args) throws Exception {
ServiceConfig service = new ServiceConfig();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
2. 服务提供者处理请求全过程
2.1 DecodeHandler
服务提供者暴露一个Netty server,所有的请求都会经过它的Handler进行处理,所以首先经过DecodeHandler,对接收到的消息进行解析
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
// 针对请求消息,将消息解析成Request对象
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// 继续交由 HeaderExchangeHandler 处理
handler.received(channel, message);
}
}
DecodeHandler与HeaderExchangeHandler的封装关系在HeaderExchanger中就已确定。
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
2.2 HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 继续进行处理
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} ...
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
...
Object msg = req.getData();
try {
// 这个Handler是什么?具体参考2.2.1
CompletionStage future = handler.reply(channel, msg);
// 执行完成后,将结果添加到Response中
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
}
2.2.1 HeaderExchangeHandler.handler 属性的添加
代码的世界没有无缘无故的东西,所有的属性都是在某个地方予以添加的。所以这个HeaderExchangeHandler.handler属性是什么呢?
在DubboProtocol.createServer()方法中:
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {...}
private ProtocolServer createServer(URL url) {
...
ExchangeServer server;
try {
// 将当前requestHandler与url进行绑定
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
}
requestHandler后续会绑定到HeaderExchanger.bind()或connect()方法中。
所以后续是DubboProtocol.requestHandler来处理请求体
2.3 DubboProtocol.requestHandler.reply()private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {
...
Invocation inv = (Invocation) message;
// 重点在这里,通过Invocation中的请求参数,获取在DubboExporter中的具体Invoker
// 具体见2.3.1
Invoker invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
}
2.3.1 通过请求体获取具体的DubboInvoker处理
public class DubboProtocol extends AbstractProtocol {
Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = (String) inv.getObjectAttachments().get(PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(
port,
path,
(String) inv.getObjectAttachments().get(VERSION_KEY),
(String) inv.getObjectAttachments().get(GROUP_KEY)
);
DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
}
return exporter.getInvoker();
}
}
从目前为止,就回到了provider的主战场了,处理请求从Filter开始,再到proxy(AbstractProxyInvoker),最终到具体的实现类
2.4 各种Filter处理EchoFilter、ClassLoaderFilter、GenericFilter等等,不是本文重点,后续会专门来介绍Filter处理细节
2.5 AbstractProxyInvoker.doInvoke()本身是一个抽象类,我们之前在Provider的启动时,会创建对应实现类的Invoker,如下所示
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public Invoker getInvoker(T proxy, Class type, URL url) {
// Wrapper也是动态生成的一个实现类,
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
请求进入后,获取的Invoker就是当前AbstractProxyInvoker,用来处理客户端请求。
所以最终还是进入Wrapper.invokeMethod()方法,而Wrapper是一个动态生成的代理类,我们可以再回忆下之前Wrapper的具体内容
2.6 Wrapper0.invokeMethod() 处理请求public class Wrapper0
extends Wrapper
implements ClassGenerator.DC
{
public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
throws InvocationTargetException
{
DemoServiceImpl localDemoServiceImpl;
try
{
localDemoServiceImpl = (DemoServiceImpl)paramObject;
}
catch (Throwable localThrowable1)
{
throw new IllegalArgumentException(localThrowable1);
}
try
{
// 在这里实现了对localDemoServiceImpl的调用,就是我们在创建ServiceConfig时setRef()方法所引用的类,本例中即为 new DemoServiceImpl()
if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
}
if ((!"sayHelloAsync".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHelloAsync((String)paramArrayOfObject[0]);
}
}
catch (Throwable localThrowable2)
{
throw new InvocationTargetException(localThrowable2);
}
throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}
}
所以很清楚的看到最终通过Wrapper调用到具体的实现类
同样,时序图来展示下整个过程