您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Strom学习01--例子ExclamationTopology

宝哥大数据 发布时间:2017-03-08 10:28:47 ,浏览量:0

案例代码

这个Storm最简单的案例, 是从安装包的examples中获取的, 本例是完全基于java.

package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    //创建一个拓扑图
    TopologyBuilder builder = new TopologyBuilder();

    //设置数据的源头  Strom 依据DAG 有向无关图
    builder.setSpout("word", new TestWordSpout(), 10);
    //处理业务
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");


    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}
Strom的 编程模型:

- DAG 有向无环图 - Spout 数据源 - Bolt 处理业务

一、TopologyBuilder

TopologyBuilder 是为Storm 创建一个topology去执行操作, Topologies 底层是 Thrift structures , 但由于Thrift structures结构十分冗余, 所以TopologyBuilder简化了topology的创建, 创建topology的格式很规范, 创建TopologyBuilder

1.1、创建一个拓扑图

    TopologyBuilder builder = new TopologyBuilder();
1.2、TopologyBuilder中有四个成员变量

使用Map存储Bolt, Spout, 我们可以看到设置Bolt,Spout的时候, 将根据id, 获取相应的组件。 后两个没有提供公共访问方法。

    private Map _bolts = new HashMap();
    private Map _spouts = new HashMap();

    private Map _commons = new HashMap();
    private Map _stateSpouts = new HashMap();
1.3、设置Spout ,数据的来源

通过TopologyBuilder调用setSpout设置Spout, 源码中: 为topology设置一个Spout 设置一个并行任务数(parallelism_hint), 如果Spout设置自己为non-distributed, 这个任务数就没有作用。只有一个任务将被分配给该组件。 id : 该组件的表示, 为其他组件使用该组件的输出作为数据的标志。 spout: 数据源Spout parallelism_hint: 任务数

  public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
        validateUnusedId(id);//保证id唯一性
        initCommon(id, spout, parallelism_hint);
        _spouts.put(id, spout);//加入Spout的map中
        return new SpoutGetter(id);
    }
案例中设置Spout
    //设置数据的源头  Strom 依据DAG 有向无环图
    builder.setSpout("word", new TestWordSpout(), 10);

使用Storm提供的Spout: new TestWordSpout() 中提供用一个SpoutOutputCollector _collector; 作为 Spout输出的容器, 下一个组件从他的OutputColoter中获取资源(tuples),

SpoutOutputCollector
/**
 * This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}.
 * The main difference between this output collector and {@link OutputCollector}
 * for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
 * acked or failed later on. This is the Spout portion of Storm's API to
 * guarantee that each message is fully processed at least once.
 */
public class SpoutOutputCollector implements ISpoutOutputCollector {
    ISpoutOutputCollector _delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        _delegate = delegate;
    }
        /**
     * Emits a new tuple to the specified output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received. The emitted values must be 
     * immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List emit(String streamId, List tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }
        /**
     * Emits a tuple to the specified task on the specified output stream. This output
     * stream must have been declared as a direct stream, and the specified task must
     * use a direct grouping on this stream to receive the message. The emitted values must be 
     * immutable.
     */
    public List emit(String streamId, List tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }

    @Override
    public void reportError(Throwable error) {
        _delegate.reportError(error);
    }
1.4、设置Blot 业务的处理

excaim1将Spout的数据作为输入。

    //处理业务
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
自定义Bolt ExclamationBolt

  ExclamaitionBolt也有一个OutputCollector作为输出的容器。

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }
加载配置, 执行
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);
      //集群模式
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
      //提交本地集群模式
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0407s