RPC(Remote Procedure Call):远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络的技术。 一个 RPC 的核心功能主要有 5 个部分组成,分别是:客户端、客户端 Stub、网络传输模块、服务端 Stub、服务端等。
Avro除了数据序列化功能外也提供了RPC功能。之所以Hadoop的创始人Doug Cutting会在已有许多现成的RPC系统的情况下,再开发Avro,个人认为原因主要如下:
- 原有的RPC系统在数据序列时必须要先生成数据的定义原件,然后再根据定义文件生成相应的代码,显得不够灵活。
- 原有的RPC系统都不是为大数据应用场景而设计的,在远程传输海量数据时的性能不能让人满意,而Avro在设计时就采用了一些例如列式存储这样的优化手段。
- 重新设计一套RPC系统,更利于与Hadoop生态圈各类框架的集成
在Avro用于RPC时,RPC的客户端和服务端会在建立握手连接时交换模式文件(可以对其进行优化,因为对于大多数远程调用而言,实际上不需要传输任何模式文件)。建立连接后,由于客户端和服务器都拥有对方的完整模式,因此可以轻松解决相同命名字段,缺失字段,多余字段等之间的对应关系。
Avro RPC提供了如下几种服务端和客户端实现:
- 基于jetty的http实现:HttpServer 和HttpTransceiver
- 基于netty的实现:NettyServer和NettyTransceiver
- 基于TCP的实现:SocketServer和SocketTransceiver
- 基于UDP的实现:DatagramServer和DatagramTransceiver
- 基于加密的TCP实现:SaslSocketServer和SaslSocketTransceiver
- 添加依赖:
junit
junit
4.12
test
org.apache.avro
avro
1.9.1
org.apache.avro
avro-ipc
1.9.1
org.apache.avro
avro-ipc-netty
1.9.1
com.fasterxml.jackson.core
jackson-core
2.10.1
com.fasterxml.jackson.core
jackson-databind
2.10.1
com.fasterxml.jackson.core
jackson-annotations
2.10.1
org.jboss.netty
netty
3.2.10.Final
org.slf4j
slf4j-api
1.7.25
org.slf4j
slf4j-log4j12
1.7.25
- 添加插件:
org.apache.avro
avro-maven-plugin
1.9.1
generate-sources
schema
idl-protocol
${project.basedir}/src/main/resources/
${project.basedir}/src/main/java/
org.apache.maven.plugins
maven-compiler-plugin
utf-8
1.8
1.8
- 在resources目录下编写日志配置文件log4j.properties,代码如下:
log4j.rootLogger = debug,stdout,D,E
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = E://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =E://logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
第二步:在resources目录下创建schema文件dept.avsc:
{
"namespace":"com.hc.bean",
"type":"record",
"name":"Dept",
"fields":[
{"name":"deptno","type":"int"},
{"name":"dname","type":"string"},
{"name":"loc","type":"string"}
]
}
第三步:编写服务API契约(avdl文件)
一个服务API契约在Avro中称之为 Avro Protocol ,Avro 提供了一种IDL语言来简化这个Avro Protocol的编写,如下:
@namespace("com.hc.service")
protocol DeptService {
import schema "dept.avsc"; //导入Schema
boolean addDept(com.hc.bean.Dept dept); //定义一个远程服务
}
在实际运行时,上述的avdl文件会转成avpr的文件形式,我们可以通过 avroj-tools.jar来进行查看并输出avpr文件,命令为:java -jar avro-tools-1.9.1.jar idl dept.avdl dept.avpr 最终生成的Avro protocol模式文件内容如下:
{
"protocol" : "DeptService",
"namespace" : "com.hc.service",
"types" : [ {
"type" : "record",
"name" : "Dept",
"namespace" : "com.hc.bean",
"fields" : [ {
"name" : "deptno",
"type" : "int"
}, {
"name" : "dname",
"type" : "string"
}, {
"name" : "loc",
"type" : "string"
} ]
} ],
"messages" : {
"addDept" : {
"request" : [ {
"name" : "dept",
"type" : "com.hc.bean.Dept"
} ],
"response" : "boolean"
}
}
}
其中定义了1种类型叫做DeptService,有3个成员deptno、dname、loc。还定义了1个消息服务叫做addDept,输入有一个参数,类型是Dept,返回boolean。
第四步:生成服务接口类使用 mvn clean install -DskipTests=true 对项目进行打包,就会触发maven插件根据编写的avdl文件生成服务契约接口。 最终生成的代码:
@org.apache.avro.specific.AvroGenerated
public interface DeptService {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"DeptService\",\"namespace\":\"com.hc.service\",\"types\":[{\"type\":\"record\",\"name\":\"Dept\",\"namespace\":\"com.hc.bean\",\"fields\":[{\"name\":\"deptno\",\"type\":\"int\"},{\"name\":\"dname\",\"type\":\"string\"},{\"name\":\"loc\",\"type\":\"string\"}]}],\"messages\":{\"addDept\":{\"request\":[{\"name\":\"dept\",\"type\":\"com.hc.bean.Dept\"}],\"response\":\"boolean\"}}}");
/**
*/
boolean addDept(com.hc.bean.Dept dept);
@SuppressWarnings("all")
public interface Callback extends DeptService {
public static final org.apache.avro.Protocol PROTOCOL = com.hc.service.DeptService.PROTOCOL;
/**
* @throws java.io.IOException The async call could not be completed.
*/
void addDept(com.hc.bean.Dept dept, org.apache.avro.ipc.Callback callback) throws java.io.IOException;
}
}
第五步:编写服务接口实现类:
public class DeptServiceImpl implements DeptService {
@Override
public boolean addDept(Dept dept) {
System.out.println("服务器端: "+dept);
return true;
}
}
第六步:提供服务端程序执行入口方法
public class AvroRpcServer {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(
new SpecificResponder(DeptService.class, new DeptServiceImpl()),
new InetSocketAddress(7890)
);
nettyServer.start();
System.out.println("服务器端已经启动");
}
}
客户端
第一步:拷贝文件
- 拷贝服务端pom.xml文件中的依赖信息、plugins信息
- 拷贝服务器端的Dept.java和DeptService.java文件
在resources目录下编写日志配置文件log4j.properties,代码如下:
log4j.rootLogger = debug,stdout,D,E
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = E://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =E://logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
第三步:编写客户端程序执行入口类:
public class AvroClient {
public static void main(String[] args) throws Exception{
NettyTransceiver client = new NettyTransceiver(new InetSocketAddress("127.0.0.1",7890));
DeptService proxy = SpecificRequestor.getClient(DeptService.class, client);
Dept dept = new Dept(10,"sales","boston");
boolean result = proxy.addDept(dept);
if(result){
System.out.println("服务调用成功");
}else {
System.out.println("服务调用失败");
}
}
}
运行程序
- 运行服务端的AvroRpcServer.javayywr
- 运行客户端的AvroRpcClient.java 最终程序,运行结果:
- 服务端:
- 客户端:
Avro的RPC实现不需要定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。 具体示例请参看博客https://blog.csdn.net/zhu_tianwei/article/details/44042955(未测试)https://www.jianshu.com/p/ecbb607809c4(未测试)