Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。(这句话很重要,资源计算,资源的分配不是Flink当前强项cuiyaonan2000@163.com)
它集成了所有常见的集群资源管理器,例如Hadoop YARN、Apache Mesos和Kubernetes,但也可以设置作为独立集群甚至库运行。-----------------------Flink推荐YARN,K8S,Mesos的资源管理器,同时自己也提供自己的资源管理器
Flink的集群环境根据有三种形式:
- yarn:第三方的计算资源调度框架(ResourceManager,NodeManager)
- standalone : 独立模式,Flink自带集群,开发测试环境使用。(Flink 官方开发的计算资源调度框架(JobManager,TaskManager))
- local:本地模式,一般不使用
参考官方文档:Apache Flink 1.12 Documentation: 本地模式安装
#下载相关文件包 [root@cuiyaonan2000 soft] wget https://mirrors.bfsu.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz [root@cuiyaonan2000 soft] tar -zxvf flink-1.12.2-bin-scala_2.12.tgz [root@cuiyaonan2000 soft] cd flink-1.12.2 #启动服务 [root@cuiyaonan2000 soft] ./start-cluster.sh #关闭服务 [root@cuiyaonan2000 soft] ./stop-cluster.sh
启动成功后使用jps查看相关的进程
其中里面的含义如下所示:
- StandaloneSessionClusterEntrypoint —> JobManager
- TaskManagerRunner —> TaskManager
访问该服务器的8081服务器:http://10.1.80.187:8081/#/overview 显示内容如下。
具体可以参考官网:Apache Flink 1.12 Documentation: Standalone
这里使用两台机器搭建环境。一个机器做JobMaster,一个机器做WorkerMaster
首先在master服务器上操作,然后在把设置好的flink文件夹传递到worker服务器上。
[root@cuiyaonan2000 soft] vi /etc/profile #增加如下内容 export FLINK_HOME=/soft/flink/flink-1.12.2 export PATH=$FLINK_HOME/bin:$PATH [root@cuiyaonan2000 soft] source /etc/profile [root@cuiyaonan2000 soft] cd flink-1.12.2 #编辑配置文件 [root@cuiyaonan2000 soft] vi ./conf/flink-conf.yaml #设置为主节点的IP或者主机名 jobmanager.rpc.address: 10.1.80.187 #主节点默认端口 jobmanager.rpc.port: 6123 #定义 Flink 允许在每个节点上分配的最大内存值 #如下区分了master内存和task内存 #这些值的单位是 MB jobmanager.memory.process.size taskmanager.memory.process.size #设置master服务器 [root@cuiyaonan2000 soft] vi ./conf/master #如下的8081是默认的可以在flink-conf.yaml中修改 #rest.port: 8081 10.1.80.187:8081 #设置work服务器 [root@cuiyaonan2000 soft] vi ./conf/worker #增加worker服务器 10.1.80.190 #将flink的文件夹传递到worker的服务器上 [root@cuiyaonan2000 soft] scp -r flink-1.12.2 root@10.1.80.190:/soft
在worker服务器中同时要设置环境变量。然后就可以启动了
#该命令在master服务器上执行就行了,它会去worker自动执行, #所以要做好这几台机器间的免密登录 [root@cuiyaonan2000 soft] start-cluster.sh [root@cuiyaonan2000 soft] stop-cluster.sh #可以使用jps来查看master 和 worker服务器上是否已经启动了相关的服务。 [root@cuiyaonan2000 soft] jpsflink-conf.yaml 一些比较重要的配置
- 每个 JobManager 的可用内存值(jobmanager.memory.process.size),
- 每个 TaskManager 的可用内存值 (taskmanager.memory.process.size,并检查 内存调优指南),
- 每台机器的可用 CPU 数(taskmanager.numberOfTaskSlots),
- 集群中所有 CPU 数(parallelism.default)和
- 临时目录(io.tmp.dirs)
点击1,在选择2,然后在选择需要上传的jar文件
主要就是配置2个地方(参照下图很容易理解):具体参考官网:Apache Flink 1.12 Documentation: Standalone
- flink-conf.yaml ---------------------这里是使用zookeeper作为高可用的监控
- master --------------这里需要增加另外一个master的ip和端口
flink-conf.yaml的修改点如下:
# 开启HA,使用文件系统作为快照存储 state.backend: filesystem # 启用检查点,可以将快照保存到HDFS state.backend.fs.checkpointdir: hdfs://node-1:9000/flink-checkpoints # 使用zookeeper搭建高可用 high-availability: zookeeper # 存储JobManager的元数据到HDFS # 这个可以不指定的,同时也可以指定为文件夹 # 官网的注释如下 # Default target directory for savepoints, optional. high-availability.storageDir: hdfs://node-1:9000/flink/ha/ # zookeepre服务器 high-availability.zookeeper.quorum: node-1:2181,node-2:2181,node-3:2181