Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。(这句话很重要,资源计算,资源的分配不是Flink当前强项cuiyaonan2000@163.com)
它集成了所有常见的集群资源管理器,例如Hadoop YARN、Apache Mesos和Kubernetes,但也可以设置作为独立集群甚至库运行。-----------------------Flink推荐YARN,K8S,Mesos的资源管理器,同时自己也提供自己的资源管理器
Flink的集群环境根据有三种形式:
- yarn:第三方的计算资源调度框架(ResourceManager,NodeManager)
- standalone : 独立模式,Flink自带集群,开发测试环境使用。(Flink 官方开发的计算资源调度框架(JobManager,TaskManager))
- local:本地模式,一般不使用
参考官方文档:Apache Flink 1.12 Documentation: 本地模式安装
#下载相关文件包
[root@cuiyaonan2000 soft] wget https://mirrors.bfsu.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz
[root@cuiyaonan2000 soft] tar -zxvf flink-1.12.2-bin-scala_2.12.tgz
[root@cuiyaonan2000 soft] cd flink-1.12.2
#启动服务
[root@cuiyaonan2000 soft] ./start-cluster.sh
#关闭服务
[root@cuiyaonan2000 soft] ./stop-cluster.sh
启动成功后使用jps查看相关的进程
其中里面的含义如下所示:
- StandaloneSessionClusterEntrypoint —> JobManager
- TaskManagerRunner —> TaskManager
访问该服务器的8081服务器:http://10.1.80.187:8081/#/overview 显示内容如下。
具体可以参考官网:Apache Flink 1.12 Documentation: Standalone
这里使用两台机器搭建环境。一个机器做JobMaster,一个机器做WorkerMaster
首先在master服务器上操作,然后在把设置好的flink文件夹传递到worker服务器上。
[root@cuiyaonan2000 soft] vi /etc/profile
#增加如下内容
export FLINK_HOME=/soft/flink/flink-1.12.2
export PATH=$FLINK_HOME/bin:$PATH
[root@cuiyaonan2000 soft] source /etc/profile
[root@cuiyaonan2000 soft] cd flink-1.12.2
#编辑配置文件
[root@cuiyaonan2000 soft] vi ./conf/flink-conf.yaml
#设置为主节点的IP或者主机名
jobmanager.rpc.address: 10.1.80.187
#主节点默认端口
jobmanager.rpc.port: 6123
#定义 Flink 允许在每个节点上分配的最大内存值
#如下区分了master内存和task内存
#这些值的单位是 MB
jobmanager.memory.process.size
taskmanager.memory.process.size
#设置master服务器
[root@cuiyaonan2000 soft] vi ./conf/master
#如下的8081是默认的可以在flink-conf.yaml中修改
#rest.port: 8081
10.1.80.187:8081
#设置work服务器
[root@cuiyaonan2000 soft] vi ./conf/worker
#增加worker服务器
10.1.80.190
#将flink的文件夹传递到worker的服务器上
[root@cuiyaonan2000 soft] scp -r flink-1.12.2 root@10.1.80.190:/soft
在worker服务器中同时要设置环境变量。然后就可以启动了
#该命令在master服务器上执行就行了,它会去worker自动执行,
#所以要做好这几台机器间的免密登录
[root@cuiyaonan2000 soft] start-cluster.sh
[root@cuiyaonan2000 soft] stop-cluster.sh
#可以使用jps来查看master 和 worker服务器上是否已经启动了相关的服务。
[root@cuiyaonan2000 soft] jps
flink-conf.yaml 一些比较重要的配置
- 每个 JobManager 的可用内存值(
jobmanager.memory.process.size
), - 每个 TaskManager 的可用内存值 (
taskmanager.memory.process.size
,并检查 内存调优指南), - 每台机器的可用 CPU 数(
taskmanager.numberOfTaskSlots
), - 集群中所有 CPU 数(
parallelism.default
)和 - 临时目录(
io.tmp.dirs
)
点击1,在选择2,然后在选择需要上传的jar文件
主要就是配置2个地方(参照下图很容易理解):具体参考官网:Apache Flink 1.12 Documentation: Standalone
- flink-conf.yaml ---------------------这里是使用zookeeper作为高可用的监控
- master --------------这里需要增加另外一个master的ip和端口
flink-conf.yaml的修改点如下:
# 开启HA,使用文件系统作为快照存储
state.backend: filesystem
# 启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node-1:9000/flink-checkpoints
# 使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
# 这个可以不指定的,同时也可以指定为文件夹
# 官网的注释如下
# Default target directory for savepoints, optional.
high-availability.storageDir: hdfs://node-1:9000/flink/ha/
# zookeepre服务器
high-availability.zookeeper.quorum: node-1:2181,node-2:2181,node-3:2181