- RPC (Remote Procedure Call Protocol)——远程过程调用协议 - Distributed RPC:rpc请求流式、并进行处理 - RPC请求参数当做输入流,结果当做输出流 - 利用storm的分布式进行处理机制和能力 - 借助DRPC server接收请求、返回响应
二、实时请求应答服务(同步)在Storm中使用drpc server进行实时请求应答服务: 实时请求应答服务(同步),往往不是一个很简单的操作,而且大量 的操作,用DAG模型来提高请求处理速度 实时请求处理
如:发送图片, 或图片的地址, 进行图片的特征提取,流程图如下:
这里DRPC Server的好处是什么呢?这样看起来就像是一个Server,经过 Spout,然后经过Bolt,不是更麻烦了吗?DRPC Server其实适用于分布 式,可以应用分布式处理这个单个请求,来加速处理的过程
Storm只能获取数据,不能接请求和发响应,所以这里借助一 个DRPC Server来帮助完成。DRPC服务器协调: ① 接收一个RPC请求 ② 发送请求到 storm topology ③ 从storm topology接收结果 ④ 把结果 发回给等待的客户端
官网上图片更加形象添加如下drpc的配置:
启动drpc server,一定是你配置文件配置的server,启动命令
/bin/storm drpc >> ./logs/drpc.out 2>&1 &
4、 启动完以后可以通过jps查看
在集群上使用命令行提交任务:
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.BasicDRPCTopology basicDrpc
6、在监控界面看到提交的任务。
public class Test {
public static void main(String[] args) {
DRPCClient client = new DRPCClient("master", 3772);
try {
String result = client.execute("exclamation", "sdfsdfasdf");
System.out.println("result================="+result);
} catch (TException e) {
e.printStackTrace();
} catch (DRPCExecutionException e) {
e.printStackTrace();
}
}
}
二、创建DRPCServer服务
注意它使用内置的DRPCSpout, 改spout从server获取函数的调用流, 每个流都有一个唯一的id,在topology最后有一个bolt用于返回计算的结果(ReturnResults), 将结果通过这个id发送给client
public static class MyBolt extends BaseRichBolt {
OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
/**
* tuple中会传递过来两个参数
* 第一个表示是请求的ID,第二个表示是请求的参数
*/
public void execute(Tuple input) {
System.out.println(input.getLong(0) + "->" + input.getString(1));
String value = input.getString(1);
value = "hello2 " + value;
/**
* 需要发送两个field
* java.lang.RuntimeException: java.lang.IllegalArgumentException:
* Tuple created with wrong number of fields. Expected 2 fields but got 1 fields
*/
collector.emit(new Values(input.getLong(0), value));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "value"));
}
2.1、客户端
package com.chb.StormTest01.drpc;
import backtype.storm.utils.DRPCClient;
public class Client {
public static void main(String[] args) {
DRPCClient drpcClient = new DRPCClient("localhost", 3772);
try {
//函数方法为hello, 参数chb
String result = drpcClient.execute("hello", "chb");
System.out.println("客户端获取结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}