您当前的位置: 首页 >  flink

阿里云开发者

暂无认证

  • 0浏览

    0关注

    2379博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink 1.12 资源管理新特性

阿里云开发者 发布时间:2021-07-22 17:09:41 ,浏览量:0

简介:介绍 Flink 1.12 资源管理的一些特性,包括内存管理、资源调度、扩展资源框架。

本文由社区志愿者陈政羽整理,Apache Flink Committer、阿里巴巴技术专家宋辛童,Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽分享,主要介绍 Flink 1.12 资源管理的一些特性。内容主要分为 4 部分:

  1. 内存管理
  2. 资源调度
  3. 扩展资源框架
  4. 未来规划

GitHub 地址 https://github.com/apache/flink欢迎大家给 Flink 点赞送 star~

一、内存管理

首先回顾 Flink 的内存模型变迁。下图左边分别为 Flink 1.10、Flink 1.11 引入的新的内存模型。尽管涉及的模块较多,但 80% - 90% 的用户仅需关注真正用于任务执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四部分。

其它模块大部分是 Flink 的框架内存,正常不需要调整,即使遇到问题也可以通过社区文档来解决。除此之外,“一个作业究竟需要多少内存才能满足实际生产需求” 也是大家不得不面临的问题,比如其他指标的功能使用、作业是否因为内存不足影响了性能,是否存在资源浪费等。

image-20210101153149434

针对上述内容,社区在 Flink 1.12 版本提供了一个全新的, 关于 Task manager 和 Job manager 的 Web UI。

image-20210101152844186

在新的 Web UI 中,可以直接将每一项监控指标配置值、实际使用情况对应到内存模型中进行直观的展示。在此基础上,可以更清楚的了解到作业的运行情况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供支持)。通过新的 Web UI,大家能更好的了解作业的使用情况,内存管理也更方便。

1. 本地内存(Managed Memory)

Flink 托管内存实际上是 Flink 特有的一种本地内存,不受 JVM 和 GC 的管理,而是由 Flink 自行进行管理。

本地内存的特点主要体现在两方面:

  • 一方面是 slot 级别的预算规划,它可以保证作业运行过程中不会因为内存不足,造成某些算子或者任务无法运行;也不会因为预留了过多的内存没有使用造成资源浪费。 同时 Flink 能保证当任务运行结束时准确将内存释放,确保 Task Manager 执行新任务时有足够的内存可用。
  • 另一方面,资源适应性也是托管内存很重要的特性之一,指算子对于内存的需求是动态可调整的。具备了适应性,算子就不会因为给予任务过多的内存造成资源使用上的浪费,也不会因为提供的内存相对较少导致整个作业无法运行,使内存的运用保持在一定的合理范围内。

    当然,在内存分配相对比较少情况下,作业会受到一定限制,例如需要通过频繁的落盘保证作业的运行,这样可能会影响性能。

当前,针对托管内存,Flink 的使用场景如下:

  • RocksDB 状态后端:在流计算的场景中,每个 Slot 会使用 State 的 Operator,从而共享同一底层 的 RocksDB 缓存;
  • Flink 内置算子:包含批处理、Table SQL、DataSet API 等算子,每个算子有独立的资源预算,不会相互共享;
  • Python 进程:用户使用 PyFlink,使用 Python 语言定义 UDF 时需要启动 Python 的虚拟机进程。
2. Job Graph 编译阶段

Flink 对于 management memory 的管理主要分为两个阶段。

2.1 作业的 Job Graph 编译阶段

在这个阶段需要注意三个问题:

  • 第一个问题是:slot 当中到底有哪些算子或者任务会同时执行。这个问题关系到在一个查询作业中如何对内存进行规划,是否还有其他的任务需要使用 management memory,从而把相应的内存留出来。 在流式的作业中,这个问题是比较简单的,因为我们需要所有的算子同时执行,才能保证上游产出的数据能被下游及时的消费掉,这个数据才能够在整个 job grep 当中流动起来。 但是如果我们是在批处理的一些场景当中,实际上我们会存在两种数据 shuffle 的模式,

    • 一种是 pipeline 的模式,这种模式跟流式是一样的,也就是我们前面说到的 bounded stream 处理方式,同样需要上游和下游的算子同时运行,上游随时产出,下游随时消费。

      image-20210101152756380

    • 另外一种是所谓的 batch 的 blocking的方式,它要求上游把数据全部产出,并且落盘结束之后,下游才能开始读数据。

    这两种模式会影响到哪些任务可以同时执行。目前在 Flink 当中,根据作业拓扑图中的一个边的类型 (如图上)。我们划分出了定义的一个概念叫做 pipelined region,也就是全部都由 pipeline 的边锁连通起来的一个子图,我们把这个子图识别出来,用来判断哪些 task 会同时执行。

  • 第二个问题是:slot 当中到底有哪些使用场景?我们刚才介绍了三种 manage memory 的使用场景。在这个阶段,对于流式作业,可能会出现 Python UDF 以及 Stateful Operator。这个阶段当中我们需要注意的是,这里并不能肯定 State Operator 一定会用到 management memory,因为这跟它的状态类型是相关的。

    • 如果它使用了 RocksDB State Operator,是需要使用 manage memory 的;
    • 但是如果它使用的是 Heap State Backend,则并不需要。

    然而,作业在编译的阶段,其实并不知道状态的类型,这里是需要去注意的地方。

  • 第三个问题:对于 batch 的作业,我们除了需要清楚有哪些使用场景,还需要清楚一件事情,就是前面提到过 batch 的 operator。它使用 management memory 是以一种算子独享的方式,而不是以 slot 为单位去进行共享。我们需要知道不同的算子应该分别分配多少内存,这个事情目前是由 Flink 的计划作业来自动进行设置的。
2.2 执行阶段

image-20210101152432770

第一个步骤是根据 State Backend 的类型去判断是否有 RocksDB。如上图所示,比如一个 slot,有 ABC 三个算子,B 跟 C 都用到了 Python,C 还用到了 Stateful 的 Operator。这种情况下,如果是在 heap 的情况下,我们走上面的分支,整个 slot 当中只有一种在使用,就是Python。之后会存在两种使用方式:

  • 其中一个是 RocksDB State Backend,有了第一步的判断之后,第二步我们会根据用户的配置,去决定不同使用方式之间怎么样去共享 slot 的 management memory。

    在这个 Steaming 的例子当中,我们定义的 Python 的权重是 30%,State Backend 的权重是 70%。在这样的情况下,如果只有 Python,Python 的部分自然是使用 100% 的内存(Streaming 的 Heap State Backend 分支);

  • 而对于第二种情况(Streaming 的 RocksDB State Backend 分支),B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF,另外 C 再独享 70% 的内存用于 RocksDB State Backend。最后 Flink 会根据 Task manager 的资源配置,一个 slot 当中有多少 manager memory 来决定每个 operator 实际可以用的内存的数量。

image-20210101152324593

批处理的情况跟流的情况有两个不同的地方,首先它不需要去判断 State Backend 的类型,这是一个简化; 其次对于 batch 的算子,上文提到每一个算子有自己独享的资源的预算,这种情况下我们会去根据使用率算出不同的使用场景需要多少的 Shared 之后,还要把比例进一步的细分到每个 Operator。

3. 参数配置 配置参数默认值备注大小taskmanager.memory.managed.size/绝对大小权重taskmanager.memory.managed.fraction0.4相对大小(占用Flink)总内存比例taskmanager.memory.managed.consumer-weightDATAPROC:70,PYTHON:30多种用途并存时候分配权重

上方图表展示了我们需要的是 manager,memory 大小有两种配置方式:

  • 一种是绝对值的配置方式,
  • 还有一种是作为 Task Manager 总内存的一个相对值的配置方式。

taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说我们在这里面实际上是给了一个 key 冒号 value,然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key:

  • 一个是 DATAPROC, DATAPROC 既包含了流处理当中的状态后端 State Backend 的内存,也包含了批处理当中的 Batch Operator;
  • 另外一种是 Python。
二、 资源调度

部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的,这里我们也做对应的介绍。

1. 最大 Slot 数

image-20210102194947808

Flink 在 1.12 支持了最大 slot 数的一个限制(slotmanager.number-of-slots.max),在之前我们也有提到过对于流式作业我们要求所有的 operator 同时执行起来,才能够保证数据的顺畅的运行。在这种情况下,作业的并发度决定了我们的任务需要多少个 slot 和资源去执行作业。

然而对于批处理其实并不是这样的,批处理作业往往可以有一个很大的并发度,但实际并不需要这么多的资源,批处理用很少的资源,跑完前面的任务腾出 Slot 给后续的任务使用。通过这种串行的方式去执行任务能避免 YARN/K8s 集群的资源过多的占用。目前这个参数支持在 yarn/mesos/native k8 使用。

2. TaskManager 容错

在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接,甚至直接挂掉。我们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种情况需要进行作业重启,在重启的过程中需要重新申请资源和重启 TaskManager 进程,这种性能消耗代价是非常高昂的。

对于稳定性要求相对比较高的作业,Flink1.12 提供了一个新的 feature,能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager,这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复,而不需要等待一个重新的资源申请的过程。

image-20210102210054861

通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于我所需要的总共的资源数量,会多出两个 TaskManager。

任务可能是相对比较均匀的分布在上面,在能够在利用空闲 TaskManager 的同时,也能够达到一个相对比较好的负载。 一旦发生故障的时候,可以去先把任务快速的调度到现有的还存活的 TaskManager 当中,然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。

3. 任务平铺分布

任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能部分 manager 放的任务很满,有的则放的比较松散。

在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数能够控制它,去进行一个相对比较均衡的调度。

image-20210102212126391

注意:

  • 第一,这个参数我们只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是根据你作业的需求来决定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的调度需求。

    在每次调度任务的时候,实际上只能看到当前注册上来的那一个 TaskManager,Flink 没办法全局的知道后面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么特性打开了之后好像并没有起到一个很好的效果,这是第一件事情。

  • 第二个需要注意的点是,这里面我们只能决定每一个 TaskManager 上有多少空闲 slot,然而并不能够决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是完全看不到 task 的。
三、扩展资源框架 1. 背景

近年来,随着人工智能领域的不断发展,深度学习模型已经被应用到了各种各样的生产需求中,比较典型的场景如推荐系统,广告推送,智能风险控制。这些也是 Flink 一直以来被广泛使用的场景,因此,支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标,目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个:

  • 一个是 Flink AI Extended 的项目,是基于 Flink 的深度学习扩展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用户可以将 TensorFlow 当做一个算子,放在 Flink 任务中。
  • 另一个是 Alink,它是一个基于 Flink 的通用算法平台,里面也内置了很多常用的机器学习算法。

以上的两个工作都是从功能性上对 Flink 进行扩展,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此,支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。

2. 使用扩展资源

目前 Flink 支持用户配置的资源维度只有 CPU 与内存,而在实际使用中,不仅是 GPU,我们还会遇到其他资源需求,如 SSD 或 RDMA 等网络加速设备。因此,我们希望提供一个通用的扩展资源框架,任何扩展资源都可以以插件的形式来加入这个框架,GPU 只是其中的一种扩展资源。

对于扩展资源的使用,可以抽象出两个通用需求:

  • 需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求,如每个 TaskManager 上需要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,需要将用户对扩展资源的需求进行转发,以保证申请到的 Container/Pod 中存在对应的扩展资源。
  • 需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源,以 GPU 为例,算子需要知道它内部的模型可以部署在那一块 GPU 卡上,因此,需要向算子提供这些信息。
3. 扩展资源框架使用方法

使用资源框架我们可以分为以下这 3 个步骤:

  • 首先为该扩展资源设置相关配置;
  • 然后为所需的扩展资源准备扩展资源框架中的插件;
  • 最后在算子中,从 RuntimeContext 来获取扩展资源的信息并使用这些资源
3.1 配置参数
# 定义扩展资源名称,“gpu”
external-resources: gpu
# 定义每个 TaskManager 所需的 GPU 数量
external-resource.gpu.amount: 1 
# 定义Yarn或Kubernetes中扩展资源的配置键
external-resource.gpu.yarn.config-key: yarn.io/gpu
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# 定义插件 GPUDriver 的工厂类。
external-resource.gpu.driver-factory.class: 
org.apache.flink.externalresource.gpu.GPUDriverFactory

以上是使用 GPU 资源的配置示例:

  • 对于任何扩展资源,用户首先需要将它的名称加入 "external-resources" 中,这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中,我们定义了一种名为 "gpu" 的资源。
  • 在调度层,目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中,我们定义每个 TaskManager 上的 GPU 设备数为 1。
  • 将 Flink 部署在 Kubernetes 或是 Yarn 上时,我们需要配置扩展资源在对应的资源底座上的配置键,以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。
  • 如果提供了插件,则需要将插件的工厂类名放入配置中。
3.2 前置准备

在实际使用扩展资源前,还需要做一些前置准备工作,以 GPU 为例:

  • 在 Standalone 模式下,集群管理员需要保证 GPU 资源对 TaskManager 进程可见。
  • 在 Kubernetes 模式下,需要集群支持 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中安装了 GPU 对应的插件。
  • 在 Yarn 模式下,GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。
3.3 扩展资源框架插件

完成了对扩展资源的调度后,用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取,它的接口如下:

public interface ExternalResourceDriverFactory {
  /**
  * 根据提供的设置创建扩展资源的Driver
  */
  ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}

public interface ExternalResourceDriver {
  /**
  * 获取所需数量的扩展资源信息
  */
  Set            
关注
打赏
1660801899
查看更多评论
0.0478s