学过dubbo的应该知道dubbo底层基于Netty实现,为了加强对Netty的理解,这篇文章我们来仿照dubbo手撸一个简易版本的RPC框架
结构理解先来看一张图 原理还是比较简单 : 代理 + 线程池 + Netty 下面做一些解释:
- 首先需要定义一个统一的API接口,例:UserApi , 服务端(provider)需要实现这个接口,提供相应的方法UserApiImpl#save,客户端通过远程来调用该接口。
- 然后需要约定一个协议,服务器如何才能识别到客户端要调用哪个接口?:我这里用 “接口权限定名#方法名#参数” ,的方式来,因为是一个简单版本的RPC。服务端解析该内容就能匹配对应的接口的实现类,然后调用该方法。并把方法的返回值通过Netty写回给客户端
- 使用的编解码器都是比价简单的String的编解码器
- 提供者/服务端(provider)正常拉起NettyServer ,启动即可等待客户端的连接。
- 消费者/客户端(consumer)需要调用UserApi API接口,但是在消费者这边是没有实现类的,消费者要做的事情就是在发起 UserApi#save调用的时候,底层通过Netty向服务端通信。内容就是 “接口权限定名#方法#参数”。
- 消费者这边最好的是方式就是为接口生成代理,在代理类去约定协议,组装通信的内容,然后这里还用到了线程池,把发送Netty通信的工作交给新开的线程去处理。
- 请求发送给服务端,服务端返回一个结果,通过Netty拿到结果返回给消费者即可,整个调用过程结束。
首先定义一个统一的接口,提供一个save方法
public interface UserApi {
String save(String data);
}
提供者
第一步:提供者编写一个实现类实现该接口,save方法,返回一个“success” , 消费者通过远程来调用该方法
//实现类
public class UserApiImpl implements UserApi {
@Override
public String save(String data) {
System.out.println("调用方法 UserApiImpl#save ,参数: "+data);
return "success";
}
}
第二步:编写NettyServer,监听的IP和端口通过方法传入,这里没有什么改动
//提供者方启动
public class NettyServer {
public void start(String address,int port ){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new ServerHandler());
}
});
try {
ChannelFuture sync = bootstrap.bind(address, port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
第三步:编写Provider启动入口
//启动提供者
public class ProviderStart {
public static void main(String[] args) {
new NettyServer().start("127.0.0.1",1000);
}
}
第四步:编写ServerHandler,在channelRead0读取到数据,需要解析内容,匹配相关的service接口,且调用方法,把结果写回给客户端
public class ServerHandler extends SimpleChannelInboundHandler {
//这里没有Spring的容器,就搞一个Map
ConcurrentHashMap applicationContext = new ConcurrentHashMap();
public ServerHandler(){
applicationContext.put("cn.itsource.rpc.api.UserApi",new UserApiImpl());
}
//消息内容约定 cn.itsource.rpc.api.UserApi#save#数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("服务端收到请求 -> "+msg);
String[] msgs = msg.split("#");
String interfaceClass = msgs[0];
String methodName = msgs[1];
String data = msgs[2];
//调研Bean的方法
Object obj = applicationContext.get(interfaceClass);
Class aClass = obj.getClass();
Method method =aClass.getMethod(methodName,data.getClass());
//调用方法
Object result = method.invoke(obj,data);
if(result instanceof String){
ctx.writeAndFlush((String)result);
}else{
System.out.println("类型不支持");
}
}
}
上面只是简单模拟了一下servie调用,整合Sping可以根据类型去容器中获取Bean。到这服务端编写完成
消费者消费者要复杂一些,第一步:先编写NettyClient
public class NettyClient {
//客户端处理器,是一个 Callable
private ClientHandler clientHandler = null;
//初始化netty客户端
public void init(String address, int port){
NioEventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup);
bootstrap.channel(NioSocketChannel.class);
clientHandler = new ClientHandler();
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//一个客户端一个handler
pipeline.addLast(clientHandler);
}
});
try {
ChannelFuture sync = bootstrap.connect(address, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//执行任务的线程池
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Object getBean(Class userApiClass) {
//创建 userApi 接口的代理
Object proxyInstance = Proxy.newProxyInstance(userApiClass.getClassLoader(), new Class[]{userApiClass}, (proxy, method, args) -> {
//初始化客户端,连接Netty服务端
if(clientHandler == null)init("127.0.0.1",1000);
//把协议头#数据 ,传递给handler
clientHandler.setContent(userApiClass.getName()+"#"+method.getName()+"#"+args[0]);
//把请求交给线程池处理,clientHandler就是一个线程
return executorService.submit(clientHandler).get();
});
return proxyInstance;
}
}
NettyClient中提供了两个方法,init和 getBean方法。在init方法中去初始化NettyClient ,ClientHandler提升为了成员变量,因为下getBean面生成代理的时候会用到,然后getBean方法为接口生成了代理。在代理中把需要发送的内容“类权限定名#方法名#数据”交给Handler,把clientHandler交给线程池去执行。
第二步:编写Handler ,handler是一个Callable 如下
public class ClientHandler extends SimpleChannelInboundHandler implements Callable {
//协议头:cn.itsource.rpc.api.UserApi#save#data
private String content = null;
//上下文
private ChannelHandlerContext ctx;
//服务器返回的结果
private Object result;
public void setContent(String content){
this.content = content;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当和服务器建立连接,就需要保存ChannelHandlerContext
//在call方法中发请求会用到ChannelHandlerContext
this.ctx = ctx;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("客户端收到结果 = "+msg);
//拿到服务器返回的结果
this.result = msg;
//唤醒call方法取走结果
notify();
}
@Override
public synchronized Object call() throws Exception {
//发送请求
if(ctx == null){
System.out.println("RPC连接失败...");
return null;
}
//把内容写给服务端
ctx.writeAndFlush(content);
//服务器返回的结果是在 channelRead0 方法中拿到,这里等待
wait();
return result;
}
}
在 channelActive方法中,当和服务端建立连接就把ChannelHandlerContext上下文提升为成员变量,方便在call方法中使用。 当call方法被调用,使用ChannelHandlerContext把内容写给服务端,这个时候call方法需要等待服务端返回结果,使用了wait().
当服务端返回结果给客户端,客户端通过channelRead0方法拿到结果,并赋值给result成员变量,然后notify();唤醒wait的call方法,在call方法中就可以拿到结果返回。
第三步:就是通过NettyClient拿到UserApi代理,然后调用save方法
//消费者启动
public class ConsumerStart {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
UserApi userApi = (UserApi)nettyClient.getBean(UserApi.class);
for(int i = 0 ; i
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?