Storm 说的是 语言无关性: Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.
这里将说明下StormI在Java中的使用,java程序是基于Springboot.这最重要的是storm如何拆解任务.其它的都是流程化的东西.(cuiyaonan2000@163.com)
参考资料:
- https://www.cnblogs.com/gouyg/p/storm-springboot.html
- https://www.cnblogs.com/xuwujing/archive/2018/05/10/9021561.html
- https://www.cnblogs.com/xidianzxm/p/10764546.html
- https://blog.csdn.net/wjlzx/article/details/90638507
整合思路
Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
根据如上的描述我们要在代码中创建 3个东西:
- Topology : 主进程,控制Spout和Bolt
- Spout : 用于从Kafka,Mysql,Redis,Oracle 等等地方获取要处理的数据,并发送给Bolt
- Bolt : 接收Spout发送来的Tuple并进行处理------------这里可以得出一个很重要的结论,就是Storm不负责插接任务,需要你自己拆解任务.并封装到Tuple中cuiyaonan2000@163.com
Storm程序就是创建如上的内容
还有就是要连接数据库,连接kafka,连接Redis等等的东西,这里就需要跟我们传统的框架进行整合了.
在SpringBoot程序中如何提交storm的Topolgy?
storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?
- 解决思路:将storm的Topology写在SpringBoot启动的主类中,随着SpringBoot启动而启动。
- 实验结果:可以一起启动(按理来说也是可以的)。但是随之而来的是下一个问题,bolt和spout类无法使用spring注解。
2 如何让bolt和spout类使用spring注解?
- 解决思路:在了解到spout和bolt类是由nimbus端实例化,然后通过序列化传输到supervisor,再反向序列化,因此无法使用注解,所以这里可以换个思路,既然不能使用注解,那么就动态获取Spring的bean就好了。
- 实验结果:使用动态获取bean的方法之后,可以成功启动storm了。
代码贴图 Topology
package cui.yao.nan.storm.demo;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
*
* @author cuiyaonan2000@163.com
*
*/
@Component
public class CreateTopology {
private static final Logger logger = LoggerFactory.getLogger(CreateTopology.class);
public void createTopology(String[] args) {
// 编写bolt,storm提供了两种bolt,BasicBolt和RichBolt,
// RichBolt在执行execute后要手动提交ack或者fail,
// BasicBolt在execute执行后会自动提交ack,但是只对FailedException异常捕获并自动执行fail方法,其他异常需自己处理。
// 定义一个拓扑
//Storm框架支持多语言,在Java环境下创建一个拓扑,需要使用TopologyBuilder进行构建
TopologyBuilder builder = new TopologyBuilder();
// 设置1个Executeor(线程),默认一个
builder.setSpout("theNameForSpout",new SpoutOjbect(), 1);
//parallelism_hint 执行线程数
//setNumTasks 所有线程运行任务总数,以下配置是2个spout线程各自运行一个任务
//topologyBuilder.setSpout("worldCountSpout", new WorldCountSpout(), 2).setNumTasks(2);
// shuffleGrouping:表示是随机分组
// 设置1个Executeor(线程),和两个task
builder.setBolt("theNameForBolt", new BoltObject(), 1).shuffleGrouping("theNameForSpout");
//tuple随机分发给下一阶段的bolt ; parallelism_hint 执行线程数 ; setNumTasks 所有线程运行任务总数,以下配置是2个线程各自运行一个Bolt任务
// topologyBuilder.setBolt("worldCutBolt", new WorldCutBasicBolt(), 2)
// .setNumTasks(2)
// .shuffleGrouping("worldCountSpout");
//启动topology的配置信息
Config conf = new Config();
//设置一个应答者
//conf.setNumAckers(1);
//设置一个work
//conf.setNumWorkers(1);
//TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息
//这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能
conf.setDebug(false);
try {
// 有参数时,表示向集群提交作业,并把第一个参数当做topology名称
// 没有参数时,本地提交
if (args != null && args.length > 0) {
logger.info("运行远程模式");
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
// 启动本地模式
logger.info("运行本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyApp", conf, builder.createTopology());
}
} catch (Exception e) {
logger.error("storm启动失败!程序退出!",e);
System.exit(1);
}
logger.info("storm启动成功...");
}
}
Spout
package cui.yao.nan.storm.demo;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
*
* @author cuiyaonan2000@163.com
*
*/
@Component
public class SpoutOjbect extends BaseRichSpout {
Map conf;
TopologyContext context;
SpoutOutputCollector collector;
int num = 0 ;
private static final Logger logger = LoggerFactory.getLogger(SpoutOjbect.class);
/**
* 该方法调用一次,主要由storm框架传入SpoutOutputCollector
* @param stormConf
* @param context
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* nextTuple()方法是Spout实现的核心。 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。
*/
@Override
public void nextTuple() {
// TODO Auto-generated method stub
//发送到bolt中
this.collector.emit(new Values(" i am the values"));
// logger.info("----------------------nextTuple:" + ++num);
}
/**
* declareOutputFields是在IComponent接口中定义,用于声明数据格式。 即输出的一个Tuple中,包含几个字段。
* 这个也只执行一次
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
Fields fields = new Fields("char");
// declarer.declareStream( fields);
declarer.declare(fields);
logger.info("----------------------declareOutputFields:" );
}
/**
** 当一个Tuple处理成功时,会调用这个方法 param obj emit方法中的msgId
*/
// @Override
// public void ack(Object obj) {
// System.out.println("成功:" + obj);
// }
/**
** 当Topology停止时,会调用这个方法
*/
// @Override
// public void close() {
// }
/**
** 当一个Tuple处理失败时,会调用这个方法
*/
// @Override
// public void fail(Object obj) {
//
// }
}
Bolt
package cui.yao.nan.storm.demo;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
*
* @author cuiyaonan2000@163.com
*
*/
@Component
public class BoltObject extends BaseBasicBolt {
Map conf;
TopologyContext context;
SpoutOutputCollector collector;
private int num = 0 ;
private static final Logger logger = LoggerFactory.getLogger(BoltObject.class);
/**
** 在Bolt启动前执行,提供Bolt启动环境配置的入口 一般对于不可序列化的对象进行实例化。
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// TODO Auto-generated method stub
String str = (String)input.getValueByField("char");
logger.info("----------------------execute:" + ++num);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
运行模式
- 本地模式(Local Mode): 即Topology运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
- 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。
本地模式
本地模式类似storm集群是一个进程,用来编写和测试topology。在本地模式上运行topology类似在一个集群上运行topology。创建一个本地集群: import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster();
- 提交集群使用submitTopology,
- 杀死集群使用killTopology
- 关闭一个本地集群使用cluster.shutdown();
- 提交topology : 为jar包指定参数: storm jar storm.jar 启动类 arg1 arg2 arg3
- 杀死topology : storm kill stormname