您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 2浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Flink源码篇】Flink提交流程之flink-conf.yaml的解析和3种flink命令行客户端的添加

Bulut0907 发布时间:2022-09-07 09:18:58 ,浏览量:2

目录
  • 1. flink-conf.yaml和flink命令自定义参数解析
    • 1.1 上文回顾
    • 1.2 获取Flink的conf目录路径
    • 1.3 加载flink-conf.yaml配置文件
    • 1.4 添加3种flink命令行客户端

1. flink-conf.yaml和flink命令自定义参数解析 1.1 上文回顾

上篇,我们讲解了EnvironmentInformation.logEnvironmentInfo函数。主要是log4j2日志框架如何绑定到Flink、log4j2配置文件和日志路径的定义

这篇我们来讲解Flink的flink-conf.yaml和flink命令自定义参数解析

1.2 获取Flink的conf目录路径

在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了获取Flink的conf目录路径

    /** Submits the job based on the arguments. */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

......省略部分......
    }

这里调用了当前类CliFrontend的getConfigurationDirectoryFromEnv函数,函数实现具体如下:

    // --------------------------------------------------------------------------------------------
    //  Miscellaneous Utilities
    // --------------------------------------------------------------------------------------------

    public static String getConfigurationDirectoryFromEnv() {
        String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

        if (location != null) {
            if (new File(location).exists()) {
                return location;
            } else {
                throw new RuntimeException(
                        "The configuration directory '"
                                + location
                                + "', specified in the '"
                                + ConfigConstants.ENV_FLINK_CONF_DIR
                                + "' environment variable, does not exist.");
            }
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_1;
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_2;
        } else {
            throw new RuntimeException(
                    "The configuration directory was not specified. "
                            + "Please specify the directory containing the configuration file through the '"
                            + ConfigConstants.ENV_FLINK_CONF_DIR
                            + "' environment variable.");
        }
        return location;
    }

我们前面执行flink脚本时,调用了config.sh脚本,里面执行了export FLINK_CONF_DIR命令设置了flink的conf目录变量。这里再通过System.getenv进行flink conf目录路径的获取

1.3 加载flink-conf.yaml配置文件

在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了加载flink-conf.yaml配置文件

    /** Submits the job based on the arguments. */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);
......省略部分......
    }

这里调用了GlobalConfiguration.loadConfiguration函数。传入的参数是flink conf目录的路径

跳转后最终的loadConfiguration函数实现如下:

    /**
     * Loads the configuration files from the specified directory. If the dynamic properties
     * configuration is not null, then it is added to the loaded configuration.
     *
     * @param configDir directory to load the configuration from
     * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
     * @return The configuration loaded from the given configuration directory
     */
    public static Configuration loadConfiguration(
            final String configDir, @Nullable final Configuration dynamicProperties) {

        if (configDir == null) {
            throw new IllegalArgumentException(
                    "Given configuration directory is null, cannot load configuration");
        }

        final File confDirFile = new File(configDir);
        if (!(confDirFile.exists())) {
            throw new IllegalConfigurationException(
                    "The given configuration directory name '"
                            + configDir
                            + "' ("
                            + confDirFile.getAbsolutePath()
                            + ") does not describe an existing directory.");
        }

        // get Flink yaml configuration file
        final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

        if (!yamlConfigFile.exists()) {
            throw new IllegalConfigurationException(
                    "The Flink config file '"
                            + yamlConfigFile
                            + "' ("
                            + yamlConfigFile.getAbsolutePath()
                            + ") does not exist.");
        }

        Configuration configuration = loadYAMLResource(yamlConfigFile);

        if (dynamicProperties != null) {
            configuration.addAll(dynamicProperties);
        }

        return configuration;
    }

主要是读取flink conf目录下的flink-conf.yaml文件,然后一行行的解析,解析时去掉注释行,再通过: 分隔获取配置的key和value保存到configuration变量中

1.4 添加3种flink命令行客户端

在flink-clients/src/org.apache.flink.client.cli.CliFrontend类的main方法中,定义了添加3种flink命令行客户端

    /** Submits the job based on the arguments. */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = 31;
......省略部分......
    }

这里调用了本类的loadCustomCommandLines函数,传入的是读取的flink-conf.yaml配置和conf配置目录

loadCustomCommandLines函数的实现如下:

    public static List loadCustomCommandLines(
            Configuration configuration, String configurationDirectory) {
        List customCommandLines = new ArrayList();
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

        //	Command line interface of the YARN session, with a special initialization here
        //	to prefix all options with y/yarn.
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(
                    loadCustomCommandLine(
                            flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try {
                LOG.info("Loading FallbackYarnSessionCli");
                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
            } catch (Exception exception) {
                LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }
        }

        //	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
        // the
        //	      active CustomCommandLine in order and DefaultCLI isActive always return true.
        customCommandLines.add(new DefaultCLI());

        return customCommandLines;
    }

可以看到,这个函数主要是创建了一个customCommandLines列表,然后依次添加了GenericCLI、flinkYarnSessionCLI、DefaultCLI三个命令行客户端到列表中。下面对这三种命令行客户端进行说明:

flink-clients/src/main/java/org.apache.flink.client.cli.GenricCli.isActive函数实现如下:

    @Override
    public boolean isActive(CommandLine commandLine) {
        // flink-conf.yaml中是否指定execution.target: remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
        return configuration.getOptional(DeploymentOptions.TARGET).isPresent()
                // 运行的flink命令行是否指定-e remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session
                // -e这种参数已经被deprecated
                || commandLine.hasOption(executorOption.getOpt())
                // 运行的flink命令行是否指定-t remote | local | yarn-per-job(deprecated) | yarn-session | kubernetes-session | kubernetes-application | yarn-application
                // -t参数和flink-conf.yaml指定的execution.target等效,但优先权更高
                || commandLine.hasOption(targetOption.getOpt());
    }

GenericCli我们看isActive方法就好了,后面会根据这个方法判断激活哪种命令行客户端,然后获取该命令行客户端对应的参数。而其他的命令行客户端对应的参数就不会获取

GenericCli指定我们的application运行的模式,有本地单机集群,远程集群,yarn和kubernetes的。GenericCli的激活如代码中的注释所示,是看我们的flink-conf.yaml配置文件是否指定execution-target参数,flink命令是否指定-e、-t参数

flinkYarnSessionCLI是通过反射来进行实例化的,反射实例化的方法如下:

    /**
     * Loads a class from the classpath that implements the CustomCommandLine interface.
     *
     * @param className The fully-qualified class name to load.
     * @param params The constructor parameters
     */
    private static CustomCommandLine loadCustomCommandLine(String className, Object... params)
            throws Exception {

        Class[] types = new Class[params.length];
        for (int i = 0; i < params.length; i++) {
            checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
            types[i] = params[i].getClass();
        }

        Constructor            
关注
打赏
1664501120
查看更多评论
0.0436s