第2步:捉迷藏,kafka的main入口在哪里?
1、看一下启动kafka的脚本 nohup /usr/local/kafka_2.10-0.9.0.1/bin/kafka-server-start.sh /usr/local/kafka_2.10-0.9.0.1/config/server.properties &
我们看一下bin目录下的kafka-server-start.sh脚本: if [ $# -lt 1 ]; //参数的个数小于1 then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
EXTRA_ARGS="-name kafkaServer -loggc"
COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
//要看一下kafka-run-class.sh kafka.Kafka类
2、看一下kafka创建topic的脚本kafka-topics.sh
kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 -- topic SparkStreamingDirected kafka-topics.sh 只有一句话
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" linux说明: $0:Shell本身的文件名 $@:表示所有脚本参数的内容 $#:表示返回所有脚本参数的个数。
要看kafka-run-class.sh脚本 kafka.admin.TopicCommand
4、kafka-run-class.sh 探秘
if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]" exit 1 fi
base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then SCALA_VERSION=2.10.5 fi
if [ -z "$SCALA_BINARY_VERSION" ]; then SCALA_BINARY_VERSION=2.10 fi
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir shopt -s nullglob for dir in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*; do CLASSPATH=$CLASSPATH:$dir/* done
for file in $base_dir/examples/build/libs//kafka-examples*.jar; do CLASSPATH=$CLASSPATH:$file done
for file in $base_dir/clients/build/libs/kafka-clients*.jar; do CLASSPATH=$CLASSPATH:$file done
for file in $base_dir/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file done
for dir in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*; do CLASSPATH=$CLASSPATH:$dir/* done
for cc_pkg in "api" "runtime" "file" "json" do for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do CLASSPATH=$CLASSPATH:$file done if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then CLASSPATH=$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/* fi done
# classpath addition for release CLASSPATH=$CLASSPATH:$base_dir/libs/*
for file in $base_dir/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar; do CLASSPATH=$CLASSPATH:$file done shopt -u nullglob
# JMX settings if [ -z "$KAFKA_JMX_OPTS" ]; then KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " fi
# JMX port to use if [ $JMX_PORT ]; then KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " fi
# Log directory to use if [ "x$LOG_DIR" = "x" ]; then LOG_DIR="$base_dir/logs" fi
# Log4j settings if [ -z "$KAFKA_LOG4J_OPTS" ]; then # Log to console. This is a tool. KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties" else # create logs directory if [ ! -d "$LOG_DIR" ]; then mkdir -p "$LOG_DIR" fi fi
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" fi
# Which java to use if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi
# Memory options if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx256M" fi
# JVM performance options if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" fi
while [ $# -gt 0 ]; do COMMAND=$1 case $COMMAND in -name) DAEMON_NAME=$2 CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out shift 2 ;; -loggc) if [ -z "$KAFKA_GC_LOG_OPTS" ]; then GC_LOG_ENABLED="true" fi shift ;; -daemon) DAEMON_MODE="true" shift ;; *) break ;; esac done
# GC options GC_FILE_SUFFIX='-gc.log' GC_LOG_FILE_NAME='' if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps " fi
# Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi
前面一大堆参数略过了,最后的一行启动模式 Launch mode。
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
5、启动后就到了kafka的类入口了。
启动kafka的main入口:kafka.kafka
kafka建立topic的入口:kafka.admin.TopicCommand