写这篇博客的目的主要回顾一下学过的知识,避免遗忘,同时希望伙伴一起来指正一些不足,共同学习。文章是基于Socket来实现一个基本的RPC通信框架,并且实现版本控制。功能不会太复杂,主要是为了疏通思路脉路。
背景环境在分布式中,我们经常会用到dubbo+zookeeper的框架来实现,由于本篇博客并没有对zookeeper的实现,所以我们将由RPC-Server来统一对API进行管理,还请见谅!
一、RPC-Server创建实体类的创建,并实现get&set方法。
//实体类
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
测试接口: 这里随便定义两个方法,就不解释方法了,看名字也能猜出来。
public interface IHelloService {
String sayHello(String content);
/**
* 保存用户
* @param user
* @return
*/
String saveUser(User user);
}
RpcRequest:
关于Request对象的解释:
Request对象,又称为请求对象,该对象派生自HTTPResponse类,是ASP中重要的服务器内置对象,它连接着Web服务器和Web客户端程序。该对象用来获取客户端在请求一个页面或者传送一个Form时提供的所有信息,包括能够标识浏览器和用户的HTTP变量、存储在客户端Cookie信息以及附在URL后面的值、查询字符串或页面中Form段HTML控件内的值、Cookie、客户端证书、查询字符串等 。如浏览器和用户的变量,客户端表单中的数据、变量或者客户端的cookie信息等,Request对象对应的类是System、Web、HttpRequest类。
我们这里也简单存储一下请求的基本信息:
public class RpcRequest implements Serializable {
//类名
private String className;
//方法名
private String methodName;
//参数
private Object[] parameters;
//版本号
private String version;
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
1.1.2 打jar包
到此处,api模块的编写就简单的完成了。我们将此模块进行打包,然后添加到prc-server-provider中。
在这里我们需要对接口的注册和接收服务端的请求处理,
1.2.1 依赖
com.ccc
rpc-server-api
1.0-SNAPSHOT
org.springframework
spring-context
4.3.13.RELEASE
1.2.2 定义RpcService注解
@Target(ElementType.TYPE) //修饰范围 //类或接口
@Retention(RetentionPolicy.RUNTIME)
@Component //被spring进行扫描
public @interface RpcService {
Class value(); //拿到服务的接口
/**
* 版本号
*/
String version() default "";
}
1.2.3 对api接口实现
由于我们对版本进行了控制,所以此处简单写两个实现类并标注上述声明的注解
@RpcService(value = IHelloService.class,version = "v1.0")
public class HelloServiceImpl implements IHelloService{
@Override
public String sayHello(String content) {
System.out.println("[v1.0] request in :"+content);
return "[v1.0]say Hello:"+content;
}
@Override
public String saveUser(User user) {
System.out.println("request in saveUser :" +user);
return "[v1.0]SUCCESS";
}
}
@RpcService(value = IHelloService.class,version = "v2.0")
public class HelloServiceImpl2 implements IHelloService{
@Override
public String sayHello(String content) {
System.out.println("[v2.0] request in :"+content);
return "[v2.0]say Hello:"+content;
}
@Override
public String saveUser(User user) {
System.out.println("request in saveUser :" +user);
return "[v2.0]SUCCESS";
}
}
1.2.4 PrcServer编写
创建MyRpcServer类,实现ApplicationContextAware,InitializingBean这两个接口,当然这并不一定是必须需要实现这两个接口,主要能实现功能就行。 ApplicationContextAware:加载Spring配置文件时,如果Spring配置文件中所定义或者注解自动注入的Bean类实现了ApplicationContextAware 接口,那么在加载Spring配置文件时,会自动调用ApplicationContextAware 接口中的setApplicationContext方法。 InitializingBean:InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。 另外实现的通信是基于Socket套接字来实现的,我们这里使用一个缓存线程池来处理每一次接收到的请求,另外请求的业务逻辑我们也交由ProcessorHandler线程来处理。
public class MyRpcServer implements ApplicationContextAware, InitializingBean {
//缓存线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//存放注解的容器
private Map handlerMap = new HashMap();
//端口号
private int port;
public MyRpcServer(int port){
this.port = port;
}
/**
* InitializingBean接口方法:
* 初始化客户端。
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
ServerSocket serverSocket = null;
try {
//创建对象并设置端口
serverSocket = new ServerSocket(port);
while (true){
Socket socket = serverSocket.accept(); //此处会阻塞
//每一个socket交给一个processorHandler处理
executorService.execute(new ProcessorHandler(handlerMap,socket));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocket !=null){
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* ApplicationContextAware接口方法
* 将API进行初始化并存放于容器中
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取指定注解
Map serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if(! serviceMap.isEmpty()){
//遍历注解的value
for(Object serviceBean : serviceMap.values()){
//拿到注解
RpcService rpcService = serviceBean.getClass().getAnnotation((RpcService.class));
//获取name
String serviceName = rpcService.value().getName();
//获取版本号
String version = rpcService.version();
//添加版本号
if(!StringUtils.isEmpty(version)){
/**
* 将serviceName拼接版本号,在此控制了版本号后,
* 如果在注册API时候添加了版本号,那么客户端调用接口的时候,就必须传递版本号信息
* 否则无法进行调用。
*/
serviceName += "-"+version;
}
//将name作为key class对象作为value存放
handlerMap.put(serviceName,serviceBean);
}
}
}
}
1.2.5 ProcessorHandler
此类实现了Runnable接口,接收的每个请求都单独用一个线程来处理,该类中主要是从socket中读取客户端发送过来的信息处理然后调用指定的方法。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Map handlerMap;
/**
* 构造器
* @param handlerMap 获取注解信息
* @param socket 获取请求信息
*/
public ProcessorHandler(Map handlerMap, Socket socket) {
this.socket = socket;
this.handlerMap = handlerMap;
}
@Override
public void run() {
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
// ------------------- InputStream ------------------
//拿到客户端信息 输入流
objectInputStream = new ObjectInputStream(socket.getInputStream());
/**
* 进行反序列化,将客户端发送的Request信息读取出来。
* 包括请求哪个类,方法名称,参数
*/
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
//将请求进行处理
Object result = invoke(rpcRequest);
//-------------------- OutputStream --------------------
//可以用于发送广播消息。目前没有使用
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//得到类名
String serviceName = rpcRequest.getClassName();
//获取版本号
String version = rpcRequest.getVersion();
//如果版本号不为空,那么将请求数据按照规定拼接
if (!StringUtils.isEmpty(version)) {
serviceName += "-" + version;
}
System.out.println(serviceName);
System.out.println("map:"+handlerMap);
//反射调用
//根据key获取类对象
Object service = handlerMap.get(serviceName);
//根据RpcRequest请求中的serviceName 如果没有找到 抛出异常。
if (service == null) {
throw new RuntimeException("service not found:" + serviceName);
}
//获取请求参数数组
Object[] args = rpcRequest.getParameters(); //获得请求参数
Method method = null;
if (args != null) { //如果有参数 对参数进行处理,如果没有进行调用加载方法。
//遍历获得每个参数的类型
Class[] types = new Class[args.length];
for (int i = 0; i
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?