- 01 引言
- 02 源码分析
- 2.1 源码入口
- 2.2 IOMetricsInfo
- 2.3 MutableIOMetrics
- 2.3 MetricFetcher
- 2.3.1 MetricFetcherImpl
- 2.4 MetricQueryServiceGateway
- 2.5 RpcEndpoint
- 2.6 MetricQueryService
- 2.7 MiniCluster
- 2.8 LocalExecutor
- 2.8 StreamExecutionEnviroment
- 03 小结
附:Flink源码下载地址
在Flink的Web页面,细心的话可以看到监控页面里,有任务的详情,其中里面有详细的监控指标,如下图(发送记录数、接收记录数、发送字节数,接收字节数等):
很多时候,我们都需要 “取出这些数据”,并用在我们的需求上,那么该如何取出这些数据呢?本文来分析下源码。
02 源码分析 2.1 源码入口在Flink
的web
页面,按F12
查看源码,可以看到:
- 接口地址:http://域名/jobs/ad75bbaaa624e41a249825a9820a65cc
- 响应内容:
{
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1650352652357,
"end-time": -1,
"duration": 64629227,
"maxParallelism": -1,
"now": 1650417281584,
"timestamps": {
"INITIALIZING": 1650352652357,
"FAILED": 0,
"CREATED": 1650352652449,
"RESTARTING": 0,
"FAILING": 0,
"FINISHED": 0,
"SUSPENDED": 0,
"RECONCILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RUNNING": 1650352653087
},
"vertices": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1650352658363,
"end-time": -1,
"duration": 64623221,
"tasks": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"RECONCILING": 0,
"CANCELED": 0,
"RUNNING": 1,
"DEPLOYING": 0,
"FINISHED": 0,
"FAILED": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "ad75bbaaa624e41a249825a9820a65cc",
"name": "insert-into_default_catalog.default_database.t_student_copy",
"nodes": [
{
"id": "cbc357ccb763df2852fee8c4fc7d55f2",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
"optimizer_properties": {
}
}
]
}
}
可以看到,vertices.[0].metrics
下的内容就是本文要读取的内容,如下图:
Ctrl+H
全局搜索Flink
源码,我们可能会想到先查看接口 “/jobs/{jobId}
”,其实这样效率很低,最好的方法就是使用其 “特殊性”,比如,我们可以从返回的字段read-bytes
入手,发现定义的地方在org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo
这个类: 好了,我们可以把
IOMetricsInfo
这个类当做我们的源码分析入口。
在IOMetricsInfo
,Ctrl+G查看,可以看到这个类有多个地方被调用,其实真正的是被JobDetailsHandler
调用了,其它的类后缀都是Test
测试类,所以不作为分析的下一步,下面看看JobDetailsHandler
。
在JobDetailsHandler
,可以看到指标值是总counts
里获取的,继续看counts
counts
在这里赋值了: 接下来,我们看看
MutableIOMetrics
这个类。
进入上一步指定的MutableIOMetrics
里的addIOMetrics
方法,可以看到代码根据程序的运行状态,从不同的地方获取指标值了:
- 终止状态:从
AccessExecution
获取了指标值 - 运行状态:从
MetricFetcher
获取了指标值
因为我们的程序是运行的,当然,我们需要研究MetricFetcher
这个类里面的值是怎么拿到的。
fetcher:是抓取的意思,可以理解为取数据
我翻译了MetricFetcher
这个类的注释,内容如下:
package org.apache.flink.runtime.rest.handler.legacy.metrics;
/**
* MetricFetcher可用于从JobManager和所有注册的taskmanager中获取指标。
*
* 只有在调用{@link MetricFetcher#update()}时指标才会被获取,前提是自上次调用传递之后有足够的时间。
*
* @author : YangLinWei
* @createTime: 2022/4/20 10:30 上午
* @version: 1.0.0
*/
public interface MetricFetcher {
/**
* 获取{@link MetricStore},其中包含当前获取的所有指标。
*
* @return {@link MetricStore} 包含的所有获取的指标
*/
MetricStore getMetricStore();
/**
* 触发获取指标
*/
void update();
/**
* @return 最近一次更新的时间戳。
*/
long getLastUpdateTime();
}
继续Ctrl+T查看其实现: 可以看到有几个实现类,毋庸置疑,
MetricFetcherImpl
是它真正的实现类,看看里面的代码。
MetricFetcherImpl
里面有几个方法,如下: 我们需要知道这些指标从何而来?里面的代码不多,大部分都不是我们需要的,经一番阅读,可以知道,指标是从
queryMetrics
这个方法里获取。看看这个方法的代码:
/**
* Query the metrics from the given QueryServiceGateway.
*
* @param queryServiceGateway to query for metrics
*/
private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
queryServiceGateway
.queryMetrics(timeout)
.whenCompleteAsync(
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
if (t != null) {
LOG.debug("Fetching metrics failed.", t);
} else {
metrics.addAll(deserializer.deserialize(result));
}
},
executor);
}
所以,代码追踪了这么久,发现指标是从网关(MetricQueryServiceGateway
)里调接口去获取的,所以我们需要看源码这个网关接口(queryMetrics
)的代码实现。
从上一步,可以知道调用了MetricQueryServiceGateway
的queryMetrics
接口,具体的实现MetricQueryService
类的queryMetrics
方法,代码如下:
@Override
public CompletableFuture queryMetrics(
Time timeout) {
return callAsync(
() -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)),
timeout);
}
再看看callAsync
方法: 可以得知,本质就是使用了
rpcServer
去远程调用了接口获取指标了(具体调用了哪里呢?)。
我们看看RpcEndpoint
这个类。
我们看看RpcEndpoint
这个类的方法结构: 从这些方法名,我们可以知道,它类似于一个
HTTP
服务器,从而我们也可以知道,原来Flink
的Web
页面访问的服务器就是这个了。在看看其构造方法: 看看里面是怎么开启服务的:
可以知道,是调用了
AkkaRpcService
的startServer
方法去开启了服务。
好了,这里暂时该停止了,因为偏离了本文的中心,我们需要知道的是这些指标具体从哪里来的?那该如何进行下一步呢?
我们再回到2.4
里面的MetricQueryService
类,看看这个类是如何构造的?(这里前后连贯性很强)。
可以看到MetricQueryService
这个类里面有一个createMetricQueryService
方法,这个方法指的就是创建指标查询服务: 看看在哪里调用了这个方法:
可以在指标服务注册中心(
MetricRegistryImpl
)里面的startQueryService
方法调用了,再看看哪里调用了startQueryService
这个方法: 可以看到有3个地方开启了这个指标的服务,分别是:
- ClusterEntrypoint:
Flink
集群入口点的基类 - MiniCluster:
MiniCluster
在本地执行Flink
任务 - TaskManagerRunner:在
yarn
或standalone
模式下,这个类是任务管理器的可执行入口点。它构建相关组件(网络、I/O
管理器、内存管理器、RPC
服务、HA
服务)并启动。
为了方便理解,这里解读本地执行Flink
任务的模式就好了,即继续研读MiniCluster
。
在MiniCluster
的start()
方法,可以看到了调用了startQueryService
方法 继续看看里面的
metricQueryServiceRpcService
入参,可以知道,metricQueryServiceRpcService
(指标查询服务)是从配置里初始化来的。 继续看看
configuration
配置: 可以得知,配置是从
miniClusterConfiguration
里获取的,继续深入: 发现,配置是从构造函数里获取的,继续看看哪里调用了
MiniCluster
这个类的构造函数方法: 调用这个方法的类有很多,根据命名,可以得知较为合理的是
LocalExecutor
这个类。
我对LocalExecutor的理解:一个用于执行本地Pipelines
(例如:多条FlinkSQL
)的执行器。
看看哪里调用了MiniCluster
的构造方法: 继续看看哪里调用了
create
方法,可以得知在LocalExecutorFactory
里的getExecutor方法调用了:
Ctrl+T
,可以看到在ExecutionEnviroment
和StreamExecutionEnviroment
里调用了:
哦豁,这不是我们日常做Flink
开发常用的两个类了么。随便打开StreamExecutionEnviroment
这个类看看。
可以看到,在里面的executeAsync
方法代用了:
到这里,我们知道了配置是从用户初始化StreamExecutionEnviroment
传入的。
具体指标的参数从哪里获取,我们有了一个很好的分析思路了,我们可以自己编写一个Flink的程序,使用的是StreamExecutionEnviroment
,然后断点本文的源码,就知道来龙去脉了。
本文由于篇幅原因,在下一篇博客继续讲解。