- 1. 有效配置和程序的封装
- 1.1 上文回顾
- 1.2 构建Application运行所需的Option配置参数
- 1.3 获取application的jar包和所有依赖包的URL
- 1.4 对各种configuration进行合并,形成有效configuration
- 1.4 获取打包的程序,再进行程序的执行
- 1.5 PackagedProgram类变量userCodeClassLoader和mainClass赋值实现
上篇我们将了flink是如何对自定义参数进行解析的,以及运行flink run --help
是如何打印帮助信息的,最后讲解了flink命令行客户端的选择。这篇我们将有效配置和程序的封装
flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run方法中,ProgramOptions的创建实现如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List jobJars = getJobJarAndDependencies(programOptions);
......省略部分......
}
flink-clients/src/main/java/org.apache.flink.client.cli.ProgramOptions.create方法具体实现如下:
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
return new ProgramOptions(line);
}
}
我们这里运行的不是python程序,所有直接执行new ProgramOptions(line)
。ProgramOptions类的实现如下:
/** Base class for command line options that refer to a JAR file program. */
public class ProgramOptions extends CommandLineOptions {
// 指定命令行指定的jarfile参数的值。如果没有指定jarfile,则无-的第一个参数为jarfile
private String jarFilePath;
// 指定命令行指定的class参数的值
protected String entryPointClass;
// 将命令行的多个classpath,转换为URL列表
private final List classpaths;
// 无-的所有参数(如果第一个为jarfile则排除)
private final String[] programArgs;
// 指定命令行的parallelism参数的值,如果没有则默认为1
private final int parallelism;
// 根据命令行是否有detached或yarndetached(已deprecated),判断是否为后台运行模式
private final boolean detachedMode;
// 看命令行是否指定了shutdownOnAttachedExit参数
private final boolean shutdownOnAttachedExit;
// 根据命令行传入的参数fromSavepoint、allowNonRestoredState、restoreMode
// 形成SavepointRestoreSettings对象
private final SavepointRestoreSettings savepointSettings;
protected ProgramOptions(CommandLine line) throws CliArgsException {
......省略部分......
}
......省略部分......
}
ProgramOptions这个类,主要是在构造函数中,通过从CommandLine获取不同参数的值,经过一定的转换,给ProgramOptions的各个类变量进行赋值操作。大致的过程如上所示,具体的转换过程这里就不讲解了
1.3 获取application的jar包和所有依赖包的URLflink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run方法中,getJobJarAndDependencies的调用如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
......省略部分......
}
getJobJarAndDependencies的实现如下:
/** Get all provided libraries needed to run the program from the ProgramOptions. */
private List getJobJarAndDependencies(ProgramOptions programOptions)
throws CliArgsException {
String entryPointClass = programOptions.getEntryPointClassName();
String jarFilePath = programOptions.getJarFilePath();
try {
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);
} catch (FileNotFoundException | ProgramInvocationException e) {
throw new CliArgsException(
"Could not get job jar and dependencies from JAR file: " + e.getMessage(), e);
}
}
这一步就是获取application的jar路径,然后读取形成一个jarFile(java.io.File)。再将获取的application的mainClass全路径名称,和jarFile一起传递给PackagedProgram.getJobJarAndDependencies函数
flink-clients/src/main/java/org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies函数实现如下:
/** Returns all provided libraries needed to run the program. */
public static List getJobJarAndDependencies(
File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {
// 从java.io.File类型的jarFile获取URL
URL jarFileUrl = loadJarFile(jarFile);
// 通过java.util.jar.JarFile方式获取application的jar包里面的lib目录下面的所有依赖jar包
// 再以java.io.File.createTempFile的方式将jar包的内容,转换为一个临时的jar包,再返回
List extractedTempLibraries =
jarFileUrl == null
? Collections.emptyList()
: extractContainedLibraries(jarFileUrl);
List libs = new ArrayList(extractedTempLibraries.size() + 1);
// 最终返回application的jar包和里面的依赖jar包的URL列表
if (jarFileUrl != null) {
libs.add(jarFileUrl);
}
for (File tmpLib : extractedTempLibraries) {
try {
libs.add(tmpLib.getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new RuntimeException("URL is invalid. This should not happen.", e);
}
}
if (isPython(entryPointClassName)) {
libs.add(PackagedProgramUtils.getPythonJar());
}
return libs;
}
这一步主要是读取application的jar包,获取里面的依赖jar包。然后返回所有jar包的URL列表
1.4 对各种configuration进行合并,形成有效configurationflink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run方法中,getEffectiveConfiguration的调用如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
final List jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
......省略部分......
}
getEffectiveConfiguration函数的实现如下:
private Configuration getEffectiveConfiguration(
final CustomCommandLine activeCustomCommandLine,
final CommandLine commandLine,
final ProgramOptions programOptions,
final List jobJars)
throws FlinkException {
// 获取flink-conf.yaml的configuration,再通过activeCustomCommandLine进行对应的key-value设置
// 如果commandLine中有该key,则进行key-value的设置,否则不进行key-value设置
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCustomCommandLine, commandLine);
// 创建一个Configuration对象,从programOptions中获取值,进行key-value的设置
// 设置key为pipeline.jars,value为application的jar包和里面的依赖jar包的URL列表
final ExecutionConfigAccessor executionParameters =
ExecutionConfigAccessor.fromProgramOptions(
checkNotNull(programOptions), checkNotNull(jobJars));
// 将两个Configuration对象进行合并
executionParameters.applyToConfiguration(effectiveConfiguration);
LOG.debug(
"Effective configuration after Flink conf, custom commandline, and program options: {}",
effectiveConfiguration);
return effectiveConfiguration;
}
主要是将flink-conf.yaml、activeCustomCommandLine、commandLine、programOptions这几部分的配置进行合并,得出有效的configuration
1.4 获取打包的程序,再进行程序的执行flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend.run方法中,获取打包的程序,再进行程序的执行的调用如下:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
......省略部分......
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
}
}
getPackagedProgram函数的实现如下:
private PackagedProgram getPackagedProgram(
ProgramOptions programOptions, Configuration effectiveConfiguration)
throws ProgramInvocationException, CliArgsException {
PackagedProgram program;
try {
LOG.info("Building program from JAR file");
// 通过programOptions和effectiveConfiguration
// 进行PackagedProgram对象的创建
program = buildProgram(programOptions, effectiveConfiguration);
} catch (FileNotFoundException e) {
throw new CliArgsException(
"Could not build the program from JAR file: " + e.getMessage(), e);
}
return program;
}
这一步就是通过programOptions和effectiveConfiguration,进行PackagedProgram对象的创建
executeProgram的实现如下:
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
最终会调用flink-clients/src/main/java/org.apache.flink.client.ClientUtils.executeProgram方法,实现如下:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// 获取application程序的ClassLoader
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// 将application程序的ClassLoader,到当前线程
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
// 两个ContextEnvironment都是设置Context,这样我们的application中
// 就可以通过StreamExecutionEnvironment.getExecutionEnvironment获取Context
// 然后就可以在在我们的application中进行configuration的修改
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// 最终进行application程序的执行
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
主要进行application的ClassLoader加载,Context的设置,和最终的application程序的执行
Context的设置和application中Context获取的具体实现,后面再研究
flink-clients/src/main/java/org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution函数的实现如下:
/**
* This method assumes that the context environment is prepared, or the execution will be a
* local execution by default.
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
callMainMethod(mainClass, args);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
接着调用callMainMethod函数,实现如下:
private static void callMainMethod(Class entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
......省略部分......
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
......省略部分......
}
}
就是用entryClass通过反射获取到main方法,然后通过反射传递给main方法进行application程序的执行
PackagedProgram对象有两个变量的赋值很重要,一个是userCodeClassLoader,另一个是mainClass。我们下一小节再看
1.5 PackagedProgram类变量userCodeClassLoader和mainClass赋值实现flink-clients/src/main/java/org.apache.flink.client.program.PackagedProgram的userCodeClassLoader赋值实现如下:
this.userCodeClassLoader =
ClientUtils.buildUserCodeClassLoader(
getJobJarAndDependencies(),
classpaths,
getClass().getClassLoader(),
configuration);
flink-clients/src/main/java/org.apache.flink.client.ClientUtils.buildUserCodeClassLoader实现如下:
public static URLClassLoader buildUserCodeClassLoader(
List jars, List classpaths, ClassLoader parent, Configuration configuration) {
URL[] urls = new URL[jars.size() + classpaths.size()];
// 添加application的jar包和里面的依赖jar包的所有URL到列表
for (int i = 0; i < jars.size(); i++) {
urls[i] = jars.get(i);
}
// 添加命令行传入的classpath的所有URL到列表
for (int i = 0; i < classpaths.size(); i++) {
urls[i + jars.size()] = classpaths.get(i);
}
// 从配置文件中获取classloader.parent-first-patterns.default列表
// 从配置文件中获取classloader.parent-first-patterns.additional列表
// 再将两个列表进行合并
final String[] alwaysParentFirstLoaderPatterns =
CoreOptions.getParentFirstLoaderPatterns(configuration);
// 获取classloader.resolve-order对应的value
final String classLoaderResolveOrder =
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
// 通过classloader.resolve-order对应的value,看是parent-first还是child-first
FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
// 获取classloader.check-leaked-classloader对应的value
final boolean checkClassloaderLeak =
configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
// 对应parent-first和child-first,会返回不同的ClassLoader
// 主要是通过parent进行所有URL的加载,返回一个ClassLoader
return FlinkUserCodeClassLoaders.create(
resolveOrder,
urls,
parent,
alwaysParentFirstLoaderPatterns,
NOOP_EXCEPTION_HANDLER,
checkClassloaderLeak);
}
主要是通过从有效配置中获取值,看是parent-first还是child-first。然后进行所有classpath的URL、application的jar包和里面的依赖jar包的URL的加载,返回一个ClassLoader
flink-clients/src/main/java/org.apache.flink.client.program.PackagedProgram的mainClass赋值实现如下:
// load the entry point class
this.mainClass =
loadMainClass(
// if no entryPointClassName name was given, we try and look one up through
// the manifest
entryPointClassName != null
? entryPointClassName
: getEntryPointClassNameFromJar(this.jarFile),
userCodeClassLoader);
loadMainClass的实现如下:
private static Class loadMainClass(String className, ClassLoader cl)
throws ProgramInvocationException {
ClassLoader contextCl = null;
try {
contextCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
return Class.forName(className, false, cl);
} catch (ClassNotFoundException e) {
......省略部分......
}
}
就是通过Class.forName进行application的mainClass加载