您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 1浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Flink源码篇】Flink提交流程之logEnvironmentInfo解读

Bulut0907 发布时间:2022-08-29 09:27:48 ,浏览量:1

目录
  • 1. EnvironmentInformation.logEnvironmentInfo解读
    • 1.1 上文回顾
    • 1.2 目标
    • 1.3 logEnvironmentInfo函数
    • 1.4 运行一个Flink应用程序
    • 1.5 如何加载log4j2日志框架的
    • 1.6 如何指定log4j2配置文件和日志路径
    • 1.7 查看日志文件的内容

1. EnvironmentInformation.logEnvironmentInfo解读 1.1 上文回顾

上一篇,我们找到了flink脚本执行的入口类flink-clients/src/main/java/org.apache.flink.client.cli.CliFrontend的main方法。现在我们来查看EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args)的执行

1.2 目标

这里主要讲解flink客户端是如何加载log4j2日志框架的,log4j2的配置文件是如何指定的,以及最后的日志文件输出到哪里的

1.3 logEnvironmentInfo函数

logEnvironmentInfo函数如下所示:

    public static void logEnvironmentInfo(
            Logger log, String componentName, String[] commandLineArgs) {
        if (log.isInfoEnabled()) {
            RevisionInformation rev = getRevisionInformation();
            String version = getVersion();
            String scalaVersion = getScalaVersion();

            String jvmVersion = getJvmVersion();
            String[] options = getJvmStartupOptionsArray();

            String javaHome = System.getenv("JAVA_HOME");

            String inheritedLogs = System.getenv("FLINK_INHERITED_LOGS");

            String arch = System.getProperty("os.arch");

            long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;

            if (inheritedLogs != null) {
                log.info(
                        "--------------------------------------------------------------------------------");
                log.info(" Preconfiguration: ");
                log.info(inheritedLogs);
            }

            log.info(
                    "--------------------------------------------------------------------------------");
            log.info(
                    " Starting "
                            + componentName
                            + " (Version: "
                            + version
                            + ", Scala: "
                            + scalaVersion
                            + ", "
                            + "Rev:"
                            + rev.commitId
                            + ", "
                            + "Date:"
                            + rev.commitDate
                            + ")");
            log.info(" OS current user: " + System.getProperty("user.name"));
            log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
            log.info(" JVM: " + jvmVersion);
            log.info(" Arch: " + arch);
            log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
            log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));

            String hadoopVersionString = getHadoopVersionString();
            if (hadoopVersionString != null) {
                log.info(" Hadoop version: " + hadoopVersionString);
            } else {
                log.info(" No Hadoop Dependency available");
            }

            if (options.length == 0) {
                log.info(" JVM Options: (none)");
            } else {
                log.info(" JVM Options:");
                for (String s : options) {
                    log.info("    " + s);
                }
            }

            if (commandLineArgs == null || commandLineArgs.length == 0) {
                log.info(" Program Arguments: (none)");
            } else {
                log.info(" Program Arguments:");
                for (String s : commandLineArgs) {
                    if (GlobalConfiguration.isSensitive(s)) {
                        log.info(
                                "    "
                                        + GlobalConfiguration.HIDDEN_CONTENT
                                        + " (sensitive information)");
                    } else {
                        log.info("    " + s);
                    }
                }
            }

            log.info(" Classpath: " + System.getProperty("java.class.path"));

            log.info(
                    "--------------------------------------------------------------------------------");
        }
    }

主要是获取一些项目和运行环境的信息,然后以日志的方式打印出来

1.4 运行一个Flink应用程序

我们运行一个flink应用程序,如下所示:

[root@bigdata001 ~]# /root/flink-1.15.0/bin/flink run /root/flink-1.15.0/examples/batch/WordCount.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/flink-1.15.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID f6cf5b1bcd6cabb58d9a0f2a602c24a3
Program execution finished
Job with JobID f6cf5b1bcd6cabb58d9a0f2a602c24a3 has finished.
Job Runtime: 779 ms
Accumulator Results: 
- 8500790cdaa5777dec3831217b796b95 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
......省略部分......
(would,2)
(wrong,1)
(you,1)
[root@bigdata001 ~]#

这里打印的都是通过System.out打印出来的,不是slf4j输出的

1.5 如何加载log4j2日志框架的

可以看到有两个slf4j的binding,一个是Flink_HOME/lib目录下的log4j-slf4j-impl,另一个是Hadoop目录下的。这两个都是flink脚本运行时添加到classpath的

那么Flink_HOME/lib/log4j-slf4j-impl-2.17.1.jar是如何来的

首先flink-dist模块添加了log4j-slf4j-impl的依赖,如下所示:

		
			org.apache.logging.log4j
			log4j-slf4j-impl
			compile
		

然后flink-dist/src/main/assemblies/bin.xml中,定义了打包的时候会将log4j-slf4j-impl打包到Flink_HOME/lib目录下,打包使用的是maven-assembly-plugin插件。如下所示:

	
		
			lib
			false
			false
			false
			true
			true

			
				org.apache.logging.log4j:log4j-api
				org.apache.logging.log4j:log4j-core
				org.apache.logging.log4j:log4j-slf4j-impl
				org.apache.logging.log4j:log4j-1.2-api
			
		
		
			lib
			false
			
				org.apache.flink:flink-shaded-zookeeper-3:jar:${zookeeper.version}-${flink.shaded.version}
			
			flink-shaded-zookeeper-${zookeeper.version}.jar
		
	

所以就会将slf4j绑定log4j2

1.6 如何指定log4j2配置文件和日志路径

这个是在flink脚本中指定的log4j2配置文件和日志输出路径,配置文件指定的日志级别是INFO级别。log4j2日志框架会读取这些JVM参数。如下所示:

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
1.7 查看日志文件的内容

通过查看flink-root-client-bigdata001.log的内容,看logEnvironmentInfo函数输出的结果,如下所示:

[root@bigdata001 ~]# cat /root/flink-1.15.0/log/flink-root-client-bigdata001.log
2022-07-13 15:59:07,439 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2022-07-13 15:59:07,460 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.15.0, Scala: 2.12, Rev:DeadD0d0, Date:1970-01-01T01:00:00+01:00)
2022-07-13 15:59:07,460 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root
2022-07-13 15:59:07,789 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: root
2022-07-13 15:59:07,789 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Azul Systems, Inc. - 11/11.0.15+10-LTS
2022-07-13 15:59:07,789 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Arch: amd64
2022-07-13 15:59:07,794 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 3972 MiBytes
2022-07-13 15:59:07,794 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /opt/zulu11.56.19-ca-jdk11.0.15-linux_x64
2022-07-13 15:59:07,799 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Hadoop version: 3.3.1
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/root/flink-1.15.0/log/flink-root-client-bigdata001.log
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/root/flink-1.15.0/conf/log4j-cli.properties
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/root/flink-1.15.0/conf/log4j-cli.properties
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/root/flink-1.15.0/conf/logback.xml
2022-07-13 15:59:07,800 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:
2022-07-13 15:59:07,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     run
2022-07-13 15:59:07,803 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -t
2022-07-13 15:59:07,803 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     local
2022-07-13 15:59:07,803 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     /root/flink-1.15.0/examples/batch/WordCount.jar
2022-07-13 15:59:07,803 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /root/flink-1.15.0/lib/flink-cep-1.15.0.jar:/root/flink-1.15.0/lib/flink-connector-files-1.15.0.jar:......省略部分......:/opt/hadoop-3.3.1/share/hadoop/yarn/timelineservice/lib/hbase-annotations-1.4.8.jar:/opt/hadoop-3.3.1/etc/hadoop:
2022-07-13 15:59:07,804 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
......省略部分......
[root@bigdata001 ~]#
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0503s