在学习Dubbo之前,我们先使用目前已知的知识点(非RPC框架),来创建一个最简单的RPC调用。
我们在开发工作中,常用的CS架构中,调用方式以HTTP居多,浏览器发起一起HTTP调用,我们的服务端接收请求并返回响应。
一直以来有一个问题萦绕在笔者心头,既然已经有了HTTP调用这么简单的方式,为什么还要开发出那么多RCP框架?
大家可以思考下,下面我们直接切入正文,来创建一次最简单的RCP调用
1.准备工作* 我们准备一个接口,并创建其实现类。
* 服务端暴露一个端口,用于接收外部请求
* 客户端发起对服务端对应端口的连接,连接成功后,发起一次调用,调用的主要内容就是上面创建的实现类,将所调用类和具体方法、参数都传递给服务端
* 服务端接收到请求后,找到对应的实现类,执行对应方法,执行完成后,将结果返回给客户端
2.代码开发 2.1 接口及实现类// 接口
public interface StudentService {
String study(String book);
}
// 实现类
public class UniversityStudentService implements StudentService {
@Override
public String study(String book) {
return "universityStudent study " + book;
}
}
2.2 服务端实现
public class ServerExport {
/** 具体暴露的ip port */
private String ip;
private int port;
public ServerExport(String ip, int port) {
this.ip = ip;
this.port = port;
}
/** 通过线程池来处理请求 */
private ExecutorService executor = Executors.newFixedThreadPool(8);
/**
* 暴露端口
* @throws IOException
*/
public void export() throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(ip, port));
while (true) {
Socket socket = serverSocket.accept();
// 接收到连接后,将其封装为Task,交由executor处理
executor.submit(new Task(socket));
}
}
private static class Task implements Runnable {
private Socket socket;
public Task(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
// 获取client发送的数据
input = new ObjectInputStream(socket.getInputStream());
// 类名
String interfaceName = input.readUTF();
// 方法名
String methodName = input.readUTF();
// 具体的参数类型和参数值
Class[] paramterTypes = (Class[])input.readObject();
Object[] paramters = (Object[])input.readObject();
Class interfaceClass = Class.forName(interfaceName);
Method method = interfaceClass.getMethod(methodName, paramterTypes);
// 通过反射发起调用
Object result = method.invoke(interfaceClass.newInstance(), paramters);
// 将结果值响应给客户端
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
} catch (IOException | ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
} finally {
if (null != output) {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != input) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != socket) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
代码不算复杂,客户端想调用具体的类和方法时,则将这些基本信息直接传递到服务端,服务端逐个接收到后,将其拼装起来,并通过反射来调用具体的方法。
2.3 客户端实现public class SimpleClientImporter {
public static void main(String[] args) {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try {
// 连接服务端
socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 20889));
output = new ObjectOutputStream(socket.getOutputStream());
// 发送调用类名称
Class serviceClass = UniversityStudentService.class;
output.writeUTF(serviceClass.getName());
// 发送调用方法名称
output.writeUTF("study");
// 发送被调用方法参数类型
Class[] paramType = new Class[1];
paramType[0] = String.class;
output.writeObject(paramType);
// 发送被调用方法具体参数值
Object[] arg = new Object[1];
arg[0] = "math";
output.writeObject(arg);
input = new ObjectInputStream(socket.getInputStream());
System.out.println(input.readObject());
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端获取对server的连接后,直接对ObjectOutputStream进行操作,将本次调用所需要的参数尽数传递。
这样服务端就可以根据我们定制的调用类、方法、参数等信息,发起具体调用。
总结:一次简单的RCP调用就这么简单的完成了。似乎意犹未尽,真的这么简单就完成了嘛,确实就是这么简单的就完成了一个简单的RCP框架。
那具体性能怎么样呢?额。。。
不用说,server端基于传统的BIO来处理请求,性能肯定惨不忍睹;
传输对象的序列化方式是JDK自带的序列化方式,序列化后生成的字节也是较大的,而且自带的序列化性能也是不高的。
下面笔者通过Netty来改造下这个BIO的调用方式
3.基于Netty重写RCP框架在重写之前,我们先将这种逐个传入类名、方法名的方式更换下,我们将所有参数信息都封装到一个对象中,这样服务端接收到这个对象即可了解到全部调用信息。
3.1 创建传输对象public class ServiceInvokeRequest implements Serializable {
private static final long serialVersionUID = -349675930021881135L;
private String serviceName;
private String methodName;
private Class[] requestParamType;
private Object[] args;
// 省略 get set方法
}
3.2 服务端创建
public class NettyServerExport {
private static int port = 20889;
public static void start(){
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
// Bootstrap基本属性
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
// 设置对象编解码Handler
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
// 最重要的是这个自定义Handler处理
ch.pipeline().addLast(new RpcObjectServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port);
System.out.println("server start");
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
start();
}
}
3.2.1 自定义RpcObjectServerHandler
public class RpcObjectServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
// 当读取到客户端的请求后,则解析对应参数并通过反射发起调用
if (msg instanceof ServiceInvokeRequest) {
ServiceInvokeRequest request = (ServiceInvokeRequest) msg;
String interfaceName = request.getServiceName();
String methodName = request.getMethodName();
Class[] paramterTypes = (Class[]) request.getRequestParamType();
Object[] paramters = (Object[]) request.getArgs();
Class interfaceClass = Class.forName(interfaceName);
Method method = interfaceClass.getMethod(methodName, paramterTypes);
// 最终还是通过反射来调用具体方法
Object result = method.invoke(interfaceClass.newInstance(), paramters);
// 调用完成后,将结果值返回给客户端
ctx.writeAndFlush(result);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
基于Netty改造后,客户端的请求会先经过ObjectDecoder的处理,将具体的字节数组转换为Object;
后续会经过自定义的RpcObjectServerHandler,针对ServiceInvokeRequest类型的Object,则解析其具体参数,并通过反射发起调用,最后将结果值通过ctx.writeAndFlush()方法返回给客户端。
3.3 客户端创建public class NettyClientImport {
public static void connect() {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup worker = new NioEventLoopGroup();
try {
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
// 使用自定义的RpcObjectClientHandler
ch.pipeline().addLast(new RpcObjectClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 20889).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
connect();
}
}
3.3.1 RpcObjectClientHandler
public class RpcObjectClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 连接建立成功后,即发送请求
// 客户端拼接需要调用的类、方法、参数的具体信息到ServiceInvokeRequest
ServiceInvokeRequest request = new ServiceInvokeRequest();
Class c = UniversityStudentService.class;
request.setServiceName(c.getName());
request.setMethodName("study");
Class[] paramType = new Class[1];
paramType[0] = String.class;
request.setRequestParamType(paramType);
Object[] args = new Object[1];
args[0] = "math";
request.setArgs(args);
// 最终将request发送到服务端
ctx.writeAndFlush(request);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在这里接收到服务端的响应结果
System.out.println(msg);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
总结:客户端发起调用还是比较简单的,我们在channelActive()中进行处理,当客户端连接到服务端之后,则发起一次调用,通过拼接request对象
最终的响应结果通过channelRead()方法获取到。
通过以上的Netty改造,我们的服务端貌似性能增强了些,但是序列化方式还是使用的JDK原生方式(这个笔者不再继续进行改造,有兴趣的小伙伴可以自行改造)。
这就算是一个一个完整的RCP框架了嘛?
当然不算是,这只是一个引子,具体我们还有很多事情没有做,具体还有哪些呢?且看下一篇章。
参考:分布式服务框架原理与实践