这个Storm最简单的案例, 是从安装包的examples中获取的, 本例是完全基于java.
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
//创建一个拓扑图
TopologyBuilder builder = new TopologyBuilder();
//设置数据的源头 Strom 依据DAG 有向无关图
builder.setSpout("word", new TestWordSpout(), 10);
//处理业务
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
Strom的 编程模型:
- DAG 有向无环图 - Spout 数据源 - Bolt 处理业务
一、TopologyBuilderTopologyBuilder 是为Storm 创建一个topology去执行操作, Topologies 底层是 Thrift structures , 但由于Thrift structures结构十分冗余, 所以TopologyBuilder简化了topology的创建, 创建topology的格式很规范, 创建TopologyBuilder
1.1、创建一个拓扑图
TopologyBuilder builder = new TopologyBuilder();
1.2、TopologyBuilder中有四个成员变量
使用Map存储Bolt, Spout, 我们可以看到设置Bolt,Spout的时候, 将根据id, 获取相应的组件。 后两个没有提供公共访问方法。
private Map _bolts = new HashMap();
private Map _spouts = new HashMap();
private Map _commons = new HashMap();
private Map _stateSpouts = new HashMap();
1.3、设置Spout ,数据的来源
通过TopologyBuilder调用setSpout设置Spout, 源码中: 为topology设置一个Spout 设置一个并行任务数(parallelism_hint), 如果Spout设置自己为non-distributed, 这个任务数就没有作用。只有一个任务将被分配给该组件。 id : 该组件的表示, 为其他组件使用该组件的输出作为数据的标志。 spout: 数据源Spout parallelism_hint: 任务数
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
validateUnusedId(id);//保证id唯一性
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);//加入Spout的map中
return new SpoutGetter(id);
}
案例中设置Spout
//设置数据的源头 Strom 依据DAG 有向无环图
builder.setSpout("word", new TestWordSpout(), 10);
使用Storm提供的Spout: new TestWordSpout()
中提供用一个SpoutOutputCollector _collector;
作为 Spout输出的容器, 下一个组件从他的OutputColoter中获取资源(tuples),
/**
* This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}.
* The main difference between this output collector and {@link OutputCollector}
* for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
* acked or failed later on. This is the Spout portion of Storm's API to
* guarantee that each message is fully processed at least once.
*/
public class SpoutOutputCollector implements ISpoutOutputCollector {
ISpoutOutputCollector _delegate;
public SpoutOutputCollector(ISpoutOutputCollector delegate) {
_delegate = delegate;
}
/**
* Emits a new tuple to the specified output stream with the given message ID.
* When Storm detects that this tuple has been fully processed, or has failed
* to be fully processed, the spout will receive an ack or fail callback respectively
* with the messageId as long as the messageId was not null. If the messageId was null,
* Storm will not track the tuple and no callback will be received. The emitted values must be
* immutable.
*
* @return the list of task ids that this tuple was sent to
*/
public List emit(String streamId, List tuple, Object messageId) {
return _delegate.emit(streamId, tuple, messageId);
}
/**
* Emits a tuple to the specified task on the specified output stream. This output
* stream must have been declared as a direct stream, and the specified task must
* use a direct grouping on this stream to receive the message. The emitted values must be
* immutable.
*/
public List emit(String streamId, List tuple, Object messageId) {
return _delegate.emit(streamId, tuple, messageId);
}
@Override
public void reportError(Throwable error) {
_delegate.reportError(error);
}
1.4、设置Blot 业务的处理
excaim1将Spout的数据作为输入。
//处理业务
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
自定义Bolt ExclamationBolt
ExclamaitionBolt也有一个OutputCollector作为输出的容器。
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
加载配置, 执行
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
//集群模式
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
//提交本地集群模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}