PRC(Remote Procedute Call):远程过程调用,Remote Procedure Call Protocol它是一个计算机通信协议。它允许像***调用本地方法一样调用远程服务***。由于不在同一个内存空间,不能直接调用,需要通过网络表达调用的语义和传达调用的数据。
RPC作用- 屏蔽组包解包
- 屏蔽数据发送/接收
- 提高开发效率
- 业务发展的必然产物
- 远程方法对象代理 》本地的方法使用就是远程方法的代理。
- 连接管理》预先将连接都准备完毕
- 序列化/反序列化》转为字节流进行网络传输
- 寻址与负载均衡
寻址与负载均衡:微服务系统以及分布式系统最基本的一点就是高可用,冗余部署就是解决办法之一,
冗余部署:
- 至少部署两个节点
- 当其中一个节点挂了,我们还有其他节点可用(最大意义)
- 提高更高的吞吐量(单节点1000qps以此累加)
客户端找到节点1和节点2的过程并建立连接叫做寻址,负载均衡不明白的伙计可以上网搜搜,网上资料大把。
- 同步调用:在链式编程中,需等待前节点执行完毕才会继续向下执行,否则进行等待执行完毕,同步调用所需要的时间是累加的,在复杂的业务场景中效率略低。
- 异步调用:无需等待。
- 远程代理
- 序列化
- 网络传输
- 反序列化
Consumer生成本地的get(Object obj)远程代理方法,将数据等信息进行序列化信息网络传输到Provider 代理进行反序列化解析后在将得到的结果进行序列化在进行网络传输返回给的Consumer端,Consumer端拿到得到的结果进行反序列化后得到最终结果。
乞版RPC调用代码实现这里抛出一个问题:如果没有RPC框架的支持,我们实现远程调用需要做哪些事情?
Client端工作- 建立与Server的连接
- 组装数据
- 发送数据包
- 接收处理结果数据包
- 解析返回数据包
- 监听端口
- 响应连接请求
- 接收数据包
- 解析数据包,调用相应方法
- 组装请求处理结果数据包
- 发送结果数据包
此处 设计一个 用户服务的简单版
需求:用户信息管理-CRUD
调用方式:TCP长连接同步交互
协议:自定义协议
接口设计- 注册 boolean addUser(User user)
- 更新 boolean updateUser(long uid,User user)
- 注销 boolean deleteUser(long uid)
- 查询 User Info getUser(long uid)
远程调用设计数据的传输,就会涉及组包和解包,需要调用方和服务方约定数据格式-----序列化
version:进行版本控制,目的为了兼容老版本 cmd:【0,1,2,3】分别注册、更新、注销、查询 magic:例如(0x11223344)没有实际上意义,但是需要每次请求携带,避免无效连接(例如使用telnet进行测试连接)。 bodylen:指定请求body长度 body:实际数据
我们按照此协议进行虚拟Request以及Response的数据。
实际请求过程:
- ConsumerA 通过网络协议进行请求。
- 我们将各个请求的方法放入一个队列当中,通过协议中的cmd来进行具体方法定位。
- 处理完毕后,我们在找到返回的队列定位到具体的返回方法进行返回。
代码截图:
public class RpcClient {
public static void main(String[] args) throws Exception {
UserService proxyUserService = new UserService();
User user = new User();
user.setAge((short) 26);
user.setSex((short) 1);
int ret = proxyUserService.addUser(user);
if(ret == 0)
System.out.println("调用远程服务创建用户成功!!!");
else
System.out.println("调用远程服务创建用户失败!!!");
}
}
public class UserService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public int addUser (User userinfo) throws Exception {
//初始化客户端连接
TcpClient client = TcpClient.GetInstance();
try {
client.init();
} catch (Exception e) {
e.printStackTrace();
logger.error("init rpc client error");
}
//构造请求数据
RpcProtocol rpcReq = new RpcProtocol();
rpcReq.setCmd(RpcProtocol.CMD_CREATE_USER);
rpcReq.setVersion(0x01);
rpcReq.setMagicNum(0x55660711);
byte[] body = rpcReq.userInfoTobyteArray(userinfo);
rpcReq.setBodyLen(body.length);
rpcReq.setBody(body);
//序列化
byte[] reqData = rpcReq.generateByteArray();
//发送请求
client.sendData(reqData);
//接收请求结果
byte[] recvData = client.recvData();
//反序列化结果
RpcProtocol rpcResp = new RpcProtocol();
rpcResp.byteArrayToRpcHeader(recvData);
int ret = ByteConverter.bytesToInt(rpcResp.getBody(), 0);
return ret;
}
}
public class TcpClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static int MAX_PACKAGE_SIZE = 1024 * 4;
private static String SERVER_IP = "127.0.0.1";
private static int SERVER_PORT = 58885;
private static TcpClient instance = null;
private boolean isInit = false;
//private ChannelFuture channelFuture = null;
SocketChannel client = null;
private final static int CONNECT_TIMEOUT_MILLIS = 2000;
//private Bootstrap bootstrap = new Bootstrap();
public TcpClient() {}
public static TcpClient GetInstance() {
if (instance == null) {
instance = new TcpClient();
}
return instance;
}
public void init() throws Exception{
if(!isInit) {
client = SocketChannel.open(new InetSocketAddress(SERVER_IP, SERVER_PORT));
client.configureBlocking(true);
}
isInit = true;
}
public boolean sendData(byte[] data){
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
byteBuffer.put(data);
byteBuffer.flip();
int ret = 0;
try {
ret = client.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public byte[] recvData() {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKAGE_SIZE);
try {
int rs = client.read(byteBuffer);
byte[] bytes = new byte[rs];
byteBuffer.flip();
byteBuffer.get(bytes);
return bytes;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
public class RpcServer {
private static Logger logger = LoggerFactory.getLogger(RpcServer.class);
private static int SERVER_LISTEN_PORT = 58885;
public static void main(String[] args) throws Exception {
Thread tcpServerThread = new Thread("tcpServer") {
public void run() {
TcpServer tcpServer = new TcpServer(SERVER_LISTEN_PORT);
try {
tcpServer.start();
} catch (Exception e) {
logger.info("TcpServer start exception: " + e.getMessage());
}
}
};
tcpServerThread.start();
tcpServerThread.join();
}
}
public class TcpServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private int port;
private final EventLoopGroup bossGroup; //处理Accept连接事件的线程
private final EventLoopGroup workerGroup; //处理handler的工作线程
public TcpServer(int port) {
this.port = port;
this.bossGroup = new NioEventLoopGroup(1);
int cores = Runtime.getRuntime().availableProcessors();
this.workerGroup = new NioEventLoopGroup(cores);
}
public void start() throws Exception {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //连接数
serverBootstrap.localAddress(this.port);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new PkgDecoder());
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
if (channelFuture.isSuccess()) {
logger.info("rpc server start success!");
} else {
logger.info("rpc server start fail!");
}
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
logger.error("exception occurred exception=" + ex.getMessage());
} finally {
bossGroup.shutdownGracefully().sync(); // 释放线程池资源
workerGroup.shutdownGracefully().sync();
}
}
}