您当前的位置: 首页 > 

陈橙橙丶

暂无认证

  • 0浏览

    0关注

    107博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

基于Socket实现PRC通信

陈橙橙丶 发布时间:2020-05-09 16:57:14 ,浏览量:0

前言

写这篇博客的目的主要回顾一下学过的知识,避免遗忘,同时希望伙伴一起来指正一些不足,共同学习。文章是基于Socket来实现一个基本的RPC通信框架,并且实现版本控制。功能不会太复杂,主要是为了疏通思路脉路。

背景环境

在分布式中,我们经常会用到dubbo+zookeeper的框架来实现,由于本篇博客并没有对zookeeper的实现,所以我们将由RPC-Server来统一对API进行管理,还请见谅!

一、RPC-Server创建

在这里插入图片描述

1.1 rpc-server-api 1.1.1基本创建

实体类的创建,并实现get&set方法。

//实体类
public class User {

    private String name;
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getAge() {
        return age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

测试接口: 这里随便定义两个方法,就不解释方法了,看名字也能猜出来。

public interface IHelloService {

    String sayHello(String content);
    /**
     * 保存用户
     * @param user
     * @return
     */
    String saveUser(User user);
}

RpcRequest:

关于Request对象的解释:

Request对象,又称为请求对象,该对象派生自HTTPResponse类,是ASP中重要的服务器内置对象,它连接着Web服务器和Web客户端程序。该对象用来获取客户端在请求一个页面或者传送一个Form时提供的所有信息,包括能够标识浏览器和用户的HTTP变量、存储在客户端Cookie信息以及附在URL后面的值、查询字符串或页面中Form段HTML控件内的值、Cookie、客户端证书、查询字符串等 。如浏览器和用户的变量,客户端表单中的数据、变量或者客户端的cookie信息等,Request对象对应的类是System、Web、HttpRequest类。

我们这里也简单存储一下请求的基本信息:

public class RpcRequest implements Serializable {
    //类名
    private String className;
    //方法名
    private String methodName;
    //参数
    private Object[] parameters;
    //版本号
    private String version;

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}
1.1.2 打jar包

到此处,api模块的编写就简单的完成了。我们将此模块进行打包,然后添加到prc-server-provider中。 在这里插入图片描述

1.2 rpc-server-provider

在这里我们需要对接口的注册和接收服务端的请求处理,

1.2.1 依赖
		
        
            com.ccc
            rpc-server-api
            1.0-SNAPSHOT
        

        
            org.springframework
            spring-context
            4.3.13.RELEASE
        
1.2.2 定义RpcService注解
@Target(ElementType.TYPE) //修饰范围 //类或接口
@Retention(RetentionPolicy.RUNTIME)
@Component //被spring进行扫描
public @interface RpcService {
    
    Class value(); //拿到服务的接口

    /**
     * 版本号
     */
    String version() default "";
}
1.2.3 对api接口实现

由于我们对版本进行了控制,所以此处简单写两个实现类并标注上述声明的注解

@RpcService(value = IHelloService.class,version = "v1.0")
public class HelloServiceImpl implements IHelloService{


    @Override
    public String sayHello(String content) {
        System.out.println("[v1.0] request in :"+content);
        return "[v1.0]say Hello:"+content;
    }

    @Override
    public String saveUser(User user) {
        System.out.println("request in saveUser :" +user);
        return "[v1.0]SUCCESS";
    }
}
@RpcService(value = IHelloService.class,version = "v2.0")
public class HelloServiceImpl2 implements IHelloService{


    @Override
    public String sayHello(String content) {
        System.out.println("[v2.0] request in :"+content);
        return "[v2.0]say Hello:"+content;
    }

    @Override
    public String saveUser(User user) {
        System.out.println("request in saveUser :" +user);
        return "[v2.0]SUCCESS";
    }
}
1.2.4 PrcServer编写

创建MyRpcServer类,实现ApplicationContextAware,InitializingBean这两个接口,当然这并不一定是必须需要实现这两个接口,主要能实现功能就行。 ApplicationContextAware:加载Spring配置文件时,如果Spring配置文件中所定义或者注解自动注入的Bean类实现了ApplicationContextAware 接口,那么在加载Spring配置文件时,会自动调用ApplicationContextAware 接口中的setApplicationContext方法。 InitializingBean:InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。 另外实现的通信是基于Socket套接字来实现的,我们这里使用一个缓存线程池来处理每一次接收到的请求,另外请求的业务逻辑我们也交由ProcessorHandler线程来处理。

public class MyRpcServer implements ApplicationContextAware, InitializingBean {
     //缓存线程池
    ExecutorService executorService = Executors.newCachedThreadPool();
    //存放注解的容器
    private Map handlerMap = new HashMap();
    //端口号
    private int port;

    public MyRpcServer(int port){
        this.port = port;
    }

    /**
     * InitializingBean接口方法:
     * 初始化客户端。
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        ServerSocket serverSocket = null;
        try {
        //创建对象并设置端口
            serverSocket = new ServerSocket(port);
            while (true){
                Socket socket = serverSocket.accept(); //此处会阻塞
                //每一个socket交给一个processorHandler处理
                executorService.execute(new ProcessorHandler(handlerMap,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(serverSocket !=null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * ApplicationContextAware接口方法
     * 将API进行初始化并存放于容器中
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    //获取指定注解
        Map serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
        if(! serviceMap.isEmpty()){
        //遍历注解的value
            for(Object serviceBean : serviceMap.values()){
                //拿到注解
                RpcService rpcService = serviceBean.getClass().getAnnotation((RpcService.class));
                //获取name
                String serviceName = rpcService.value().getName();
                //获取版本号
                String version = rpcService.version();
                //添加版本号
                if(!StringUtils.isEmpty(version)){
                /**
                *	将serviceName拼接版本号,在此控制了版本号后,
                *	如果在注册API时候添加了版本号,那么客户端调用接口的时候,就必须传递版本号信息
                *	否则无法进行调用。
                */
                    serviceName += "-"+version;
                }
                //将name作为key class对象作为value存放
                handlerMap.put(serviceName,serviceBean);

            }
        }
    }
}
1.2.5 ProcessorHandler

此类实现了Runnable接口,接收的每个请求都单独用一个线程来处理,该类中主要是从socket中读取客户端发送过来的信息处理然后调用指定的方法。

public class ProcessorHandler implements Runnable {

    private Socket socket;

    private Map handlerMap;

    /**
     * 构造器
     * @param handlerMap 获取注解信息
     * @param socket 获取请求信息
     */
    public ProcessorHandler(Map handlerMap, Socket socket) {
        this.socket = socket;
        this.handlerMap = handlerMap;
    }

    @Override
    public void run() {
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            // ------------------- InputStream ------------------
            //拿到客户端信息 输入流
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            /**
             * 进行反序列化,将客户端发送的Request信息读取出来。
             * 包括请求哪个类,方法名称,参数
             */
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            //将请求进行处理
            Object result = invoke(rpcRequest);

            //-------------------- OutputStream --------------------
            //可以用于发送广播消息。目前没有使用
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        //得到类名
        String serviceName = rpcRequest.getClassName();
        //获取版本号
        String version = rpcRequest.getVersion();
        //如果版本号不为空,那么将请求数据按照规定拼接
        if (!StringUtils.isEmpty(version)) {
            serviceName += "-" + version;
        }
        System.out.println(serviceName);
        System.out.println("map:"+handlerMap);
        //反射调用
        //根据key获取类对象
        Object service = handlerMap.get(serviceName);

        //根据RpcRequest请求中的serviceName 如果没有找到 抛出异常。
        if (service == null) {
            throw new RuntimeException("service not found:" + serviceName);
        }
        //获取请求参数数组
        Object[] args = rpcRequest.getParameters(); //获得请求参数
        Method method = null;
        if (args != null) { //如果有参数 对参数进行处理,如果没有进行调用加载方法。
            //遍历获得每个参数的类型
            Class[] types = new Class[args.length];
            for (int i = 0; i             
关注
打赏
1648473527
查看更多评论
0.0389s