基于Storm 1.0.6
版本
本文查阅了许多资料并反复琢磨,应该能做到一篇明白Storm的worker进程、线程、任务关系
一、概念-
进程:即worker进程,工作进程,Java进程,或称JVM进程
-
线程:即executor,由进程产生,用于执行任务
-
任务:即task,任务就是被线程执行的东西,数据会流过task,线程作用于task将数据处理,可以理解task就是数据的处理工具,或者是数据的处理逻辑的工具
-
进程数:即worker数
-
线程数:即executor数
-
任务数:即task数
-
component:
spout
和bolt
的统称,后续也会用bolt的prepare/execute方法统称spout的open/nextTuple方法
worker进程可以比喻为工厂
,executor线程可以比喻为工人
,task任务可以比喻为车床
工厂里有很多工人,每个工人操作一到多台车床,车床有原料流过,原料从车床处理完毕流到下一个车床。
进程、线程是比较好理解的,任务(task)就比较难理解,什么是task,task应该是数据的处理逻辑的集合,线程只负责执行,至于怎么执行,是task规定的,而task怎么知道如何处理数据,它应该是委托给Bolt/Spount的,这就是我们书写的Bolt/Spount里头的处理逻辑,而task其实就是包装了bolt/spout实例,线程执行了task的run方法,run方法里转而调用execute/nextTuple方法
三者关系以下知识点细节都是正确的 (目前的知识认为是对的)
- bolt/spout 实例数一定等于task数
- 每个task都有自己的 bolt 或 spout 的实例,所以 task数等于 bolt/spout 实例数
- 线程数 内部component特定的配置项>拓扑特定的配置项>storm.yaml>defaults.yaml
一、非运行时方式指定
用途描述配置文件方式进行设置代码方式进行设置worker进程数量拓扑在集群机器上运行时需要的worker进程数据量Config#TOPOLOGY_WORKERSConfig#setNumWorkers每个组件需要创建的executor数量executor线程的数量没有单独配置项TopologyBuilder#setSpout() 和 TopologyBuidler#setBolt() Storm 0.8之后使用 parallelism_hint参数来指定executor的初始数量(不是task数,源码中的描述可能由于未及时更新,是错的,详细参考文章末尾的
参考资料
)task数量每个组件需要创建的task数量Config#TOPOLOGY_TASKSComponentConfigurationDeclarer#setNumTasks() 二、运行时方式指定 1、使用命令行即拓扑已经在storm平台跑起来了, 如何动态改
- task数,改不了
- executor数,使用
-e
- worker数,使用
-n
例子:
2、使用 StormUI# 重新配置“mytopology”拓扑使用5个worker进程[原来是2个] # "blue-spout"这个spout使用3个[原来有2个]executor # "yellow-bolt"使用10个[原来有6个]executor $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
(略)
参考资料- 官方1.0.6版本文档 (本文的storm基于
1.0.1
版本,因官方没这版本的文档,查看1.0.6
) - Understanding the Parallelism of a Storm Topology (这里是中文版) 非常推荐
- Thread safe of storm bolt (stack overflow)
- Thread safety of bolts 似乎是谷歌的论坛,回复问题的人Nathan正是Storm作者!!!
- 应该是官方文档的翻译 非常全面的参考资料
- Storm消息可靠性保证 如何保证消息的可靠性,有代码例子
- How is this word count bolt thread safe?
- 源码中的一些注释
// setBolt带上的并行度的解释 /** * Define a new bolt in this topology with the specified amount of parallelism. * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster. * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {} // parallelism_hint解释:分配用于执行该bolt的任务数,感觉有点奇怪,怎么是"任务数",不是线程数吗?为什么用了task这个单词 // setNumTasks,代码如下解释 /** * How many instances to create for a spout/bolt. A task runs on a thread with zero or more * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always * the same throughout the lifetime of a topology, but the number of executors (threads) for * a spout/bolt can change over time. This allows a topology to scale to more or less resources * without redeploying the topology or violating the constraints of Storm (such as a fields grouping * guaranteeing that the same value goes to the same task). */