- 1. flink-conf.yaml和flink命令自定义参数解析
- 1.1 上文回顾
- 1.2 获取Flink的conf目录路径
- 1.3 加载flink-conf.yaml配置文件
- 1.4 添加3种flink命令行客户端
上篇,我们讲解了EnvironmentInformation.logEnvironmentInfo函数。主要是log4j2日志框架如何绑定到Flink、log4j2配置文件和日志路径的定义
这篇我们来讲解Flink的flink-conf.yaml和flink命令自定义参数解析
1.2 获取Flink的conf目录路径在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了获取Flink的conf目录路径
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
......省略部分......
}
这里调用了当前类CliFrontend的getConfigurationDirectoryFromEnv函数,函数实现具体如下:
// --------------------------------------------------------------------------------------------
// Miscellaneous Utilities
// --------------------------------------------------------------------------------------------
public static String getConfigurationDirectoryFromEnv() {
String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (location != null) {
if (new File(location).exists()) {
return location;
} else {
throw new RuntimeException(
"The configuration directory '"
+ location
+ "', specified in the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable, does not exist.");
}
} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_1;
} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_2;
} else {
throw new RuntimeException(
"The configuration directory was not specified. "
+ "Please specify the directory containing the configuration file through the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable.");
}
return location;
}
我们前面执行flink脚本时,调用了config.sh脚本,里面执行了export FLINK_CONF_DIR
命令设置了flink的conf目录变量。这里再通过System.getenv
进行flink conf目录路径的获取
在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了加载flink-conf.yaml配置文件
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
......省略部分......
}
这里调用了GlobalConfiguration.loadConfiguration函数。传入的参数是flink conf目录的路径
跳转后最终的loadConfiguration函数实现如下:
/**
* Loads the configuration files from the specified directory. If the dynamic properties
* configuration is not null, then it is added to the loaded configuration.
*
* @param configDir directory to load the configuration from
* @param dynamicProperties configuration file containing the dynamic properties. Null if none.
* @return The configuration loaded from the given configuration directory
*/
public static Configuration loadConfiguration(
final String configDir, @Nullable final Configuration dynamicProperties) {
if (configDir == null) {
throw new IllegalArgumentException(
"Given configuration directory is null, cannot load configuration");
}
final File confDirFile = new File(configDir);
if (!(confDirFile.exists())) {
throw new IllegalConfigurationException(
"The given configuration directory name '"
+ configDir
+ "' ("
+ confDirFile.getAbsolutePath()
+ ") does not describe an existing directory.");
}
// get Flink yaml configuration file
final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
if (!yamlConfigFile.exists()) {
throw new IllegalConfigurationException(
"The Flink config file '"
+ yamlConfigFile
+ "' ("
+ yamlConfigFile.getAbsolutePath()
+ ") does not exist.");
}
Configuration configuration = loadYAMLResource(yamlConfigFile);
if (dynamicProperties != null) {
configuration.addAll(dynamicProperties);
}
return configuration;
}
主要是读取flink conf目录下的flink-conf.yaml文件,然后一行行的解析,解析时去掉注释行,再通过:
分隔获取配置的key和value保存到configuration变量中
在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了添加3种flink命令行客户端
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
......省略部分......
}
这里调用了本类的loadCustomCommandLines函数,传入的是读取的flink-conf.yaml配置和conf配置目录
loadCustomCommandLines函数的实现如下:
public static List loadCustomCommandLines(
Configuration configuration, String configurationDirectory) {
List customCommandLines = new ArrayList();
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(
flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
// the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
可以看到,这个函数主要是创建了一个customCommandLines列表,然后依次添加了GenericCLI、flinkYarnSessionCLI、DefaultCLI三个命令行客户端到列表中。下面对这三种命令行客户端进行说明:
flink-clients/src/main/java/org.apache.flink.client.cli.GenricCli.isActive函数实现如下:
@Override
public boolean isActive(CommandLine commandLine) {
// flink-conf.yaml中是否指定execution.target: remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
// 运行的flink命令行是否指定-e remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
// -e这种参数已经被deprecated
|| commandLine.hasOption(executorOption.getOpt())
// 运行的flink命令行是否指定-t remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session | kubernetes-application | yarn-application
// -t参数和flink-conf.yaml指定的execution.target等效,但优先权更高
|| commandLine.hasOption(targetOption.getOpt());
}
GenericCli我们看isActive方法就好了,后面会根据这个方法判断激活哪种命令行客户端,然后获取该命令行客户端对应的参数。而其他的命令行客户端对应的参数就不会获取
GenericCli指定我们的application运行的模式,有本地单机集群,远程集群,yarn和kubernetes的。GenericCli的激活如代码中的注释所示,是看我们的flink-conf.yaml配置文件是否指定execution-target参数,flink命令是否指定-e、-t参数
flinkYarnSessionCLI是通过反射来进行实例化的,反射实例化的方法如下:
/**
* Loads a class from the classpath that implements the CustomCommandLine interface.
*
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
private static CustomCommandLine loadCustomCommandLine(String className, Object... params)
throws Exception {
Class[] types = new Class[params.length];
for (int i = 0; i < params.length; i++) {
checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}
Constructor
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?