您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Strorm学习05--DRPC实时请求应答服务

宝哥大数据 发布时间:2017-03-11 11:26:22 ,浏览量:0

一、什么事DPRC

- RPC (Remote Procedure Call Protocol)——远程过程调用协议 - Distributed RPC:rpc请求流式、并进行处理 - RPC请求参数当做输入流,结果当做输出流 - 利用storm的分布式进行处理机制和能力 - 借助DRPC server接收请求、返回响应

二、实时请求应答服务(同步)

  在Storm中使用drpc server进行实时请求应答服务: 实时请求应答服务(同步),往往不是一个很简单的操作,而且大量 的操作,用DAG模型来提高请求处理速度 实时请求处理

如:发送图片, 或图片的地址, 进行图片的特征提取,流程图如下: 这里写图片描述

这里DRPC Server的好处是什么呢?这样看起来就像是一个Server,经过 Spout,然后经过Bolt,不是更麻烦了吗?DRPC Server其实适用于分布 式,可以应用分布式处理这个单个请求,来加速处理的过程

Storm只能获取数据,不能接请求和发响应,所以这里借助一 个DRPC Server来帮助完成。

DRPC服务器协调: ① 接收一个RPC请求 ② 发送请求到 storm topology ③ 从storm topology接收结果 ④ 把结果 发回给等待的客户端

官网上图片更加形象

这里写图片描述

这里写图片描述

DRPC分布是部署 1、 Storm集群启动 2、 修改storm配置文件conf/storm.yaml

添加如下drpc的配置: 这里写图片描述

3、启动drpc server

启动drpc server,一定是你配置文件配置的server,启动命令

/bin/storm drpc >> ./logs/drpc.out 2>&1 &

4、 启动完以后可以通过jps查看

这里写图片描述

5. 提交drpc的任务

在集群上使用命令行提交任务:

bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.4.jar storm.starter.BasicDRPCTopology basicDrpc
6、在监控界面看到提交的任务。

这里写图片描述

7、 使用javaAPI提交
public class Test {
    public static void main(String[] args) {
        DRPCClient client = new DRPCClient("master", 3772);
        try {
            String result = client.execute("exclamation", "sdfsdfasdf");
            System.out.println("result================="+result);
        } catch (TException e) {
            e.printStackTrace();
        } catch (DRPCExecutionException e) {
            e.printStackTrace();
        }
    }
}
二、创建DRPCServer服务

注意它使用内置的DRPCSpout, 改spout从server获取函数的调用流, 每个流都有一个唯一的id,在topology最后有一个bolt用于返回计算的结果(ReturnResults), 将结果通过这个id发送给client 这里写图片描述

最后一个Bolt必须是两个字段

这里写图片描述

    public static  class MyBolt extends BaseRichBolt {
        OutputCollector collector = null;
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.collector = collector;
        }
        /**
         * tuple中会传递过来两个参数
         * 第一个表示是请求的ID,第二个表示是请求的参数
         */
        public void execute(Tuple input) {
            System.out.println(input.getLong(0) + "->" + input.getString(1));
            String value = input.getString(1);
            value = "hello2 " + value;

            /**
             * 需要发送两个field
             * java.lang.RuntimeException: java.lang.IllegalArgumentException: 
             * Tuple created with wrong number of fields. Expected 2 fields but got 1 fields
             */
            collector.emit(new Values(input.getLong(0), value));
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "value"));
        }
2.1、客户端
package com.chb.StormTest01.drpc;
import backtype.storm.utils.DRPCClient;
public class Client {
    public static void main(String[] args) {
        DRPCClient drpcClient = new DRPCClient("localhost", 3772);
        try {
            //函数方法为hello, 参数chb
            String result = drpcClient.execute("hello", "chb");
            System.out.println("客户端获取结果:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}   
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0397s