大数据
流式处理
Kafka
源码解析

Kafka系列 28 - 服务端源码分析 19:Kafka脚本工具

简介:主要讲解Kafka中各类脚本工具的实现

1. Kafka脚本工具概览

Kafka提供了很多脚本工具便于开发人员管理Kafka,它们都位于安装包的bin目录下,分为Linux平台(.sh结尾)和Windows平台(.bat结尾)两类,这些脚本的功能简介如下:

  • kafka-server-start脚本:启动Kafka Server。
  • kafka-server-stop脚本:停止Kafka Server。
  • kafka-topics脚本:负责Topic相关操作,例如创建Topic,查询Topic名称以及详细信息,增加分区的数量并完成新增分区的副本等。
  • kafka-preferred-replica-election脚本:触发指定的分区进行“优先副本”选举,这样可以让分区Leader副本在集群中分布得更均匀。
  • kafka-reassign-partitions脚本:主要有三个功能,一是生成副本迁移的方案,二是触发副本迁移操作,即将迁移方案写入到ZooKeeper中,从而触发PartitionsReassignedListener处理,三是检测指定分区的副本迁移是否已完成。
  • kafka-console-producer脚本:控制台版本的生产者,我们可以在控制台中输入消息的key和value,由此脚本封装成消息并发送给服务端。
  • kafka-console-consumer脚本:控制台版本的消费者,我们可以通过参数指定订阅的Topic,此脚本会从服务端拉取消息并输出到控制台。
  • kafka-consumer-groups脚本:有两个主要的功能,一是查询当前所有Consumer Group,二是获取指定Consumer Group的详细信息。
  • DumpLogSegments:可由kafka-run-class脚本运行,主要负责解析输出指定的日志文件和索引文件中的内容,另外还可以实现索引文件的验证。
  • kafka-producer-perf-test脚本:负责测试生产者的各项性能指标。
  • kafka-consumer-perf-test脚本:负责测试消费者的各项性能指标。
  • kafka-mirror-maker脚本:实现了数据在多个集群的同步,可用于Kafka集群的镜像制作。

下面以Linux平台的脚本为例讲解这些脚本的实现原理。

2. kafka-server-start.sh脚本

kafka-server-start.sh脚本实现原理其实是通过使用kafka-run-class.sh脚本调用对应的类来启动Broker,源码如下:

kafka-server-start.sh
  • #!/bin/bash
  • if [ $# -lt 1 ]; # 检查参数个数
  • then # 打印使用方法
  • echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
  • exit 1
  • fi
  • # 获取当前脚本所在的路径
  • base_dir=$(dirname $0)
  • # 设置Log4j相关的环境变量
  • if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  • export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
  • fi
  • # 设置JVM内存
  • if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  • export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  • fi
  • EXTRA_ARGS="-name kafkaServer -loggc"
  • # 检测第一个参数是否为"-daemon"
  • COMMAND=$1
  • case $COMMAND in
  • -daemon)
  • EXTRA_ARGS="-daemon "$EXTRA_ARGS
  • # 左移参数列表,即删除"-daemon"参数
  • shift
  • ;;
  • *)
  • ;;
  • esac
  • # 调用kafka-run-class.sh脚本,传入参数以及主类
  • exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

前面的代码其实是用于检查参数的合法性,并设置一些配置参数等。

大部分的脚本工具都依赖于kafka-run-class.sh脚本,它的主要功能是设置CLASSPATH,进行JMX的相关配置,配置Log4j,指定存放日志文件和索引文件位置,检测JAVA_HOME环境变量,进行JVM的相关配置,决定是否后台启动:

kafka-run-class.sh
  • #!/bin/bash
  • # Licensed to the Apache Software Foundation (ASF) under one or more
  • # contributor license agreements. See the NOTICE file distributed with
  • # this work for additional information regarding copyright ownership.
  • # The ASF licenses this file to You under the Apache License, Version 2.0
  • # (the "License"); you may not use this file except in compliance with
  • # the License. You may obtain a copy of the License at
  • #
  • # http://www.apache.org/licenses/LICENSE-2.0
  • #
  • # Unless required by applicable law or agreed to in writing, software
  • # distributed under the License is distributed on an "AS IS" BASIS,
  • # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • # See the License for the specific language governing permissions and
  • # limitations under the License.
  • # 检测参数是否合法
  • if [ $# -lt 1 ];
  • then
  • # 打印使用方式
  • echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  • exit 1
  • fi
  • # 检测INCLUDE_TEST_JARS变量是否为空,如果为空将其设置为false
  • if [ -z "$INCLUDE_TEST_JARS" ]; then
  • INCLUDE_TEST_JARS=false
  • fi
  • # Exclude jars not necessary for running commands.
  • # 定义should_include_file函数,用于检测CLASSPATH是否需要包含指定的文件
  • regex="(-(test|src|scaladoc|javadoc)\.jar|jar.asc)$"
  • should_include_file() {
  • if [ "$INCLUDE_TEST_JARS" = true ]; then
  • return 0
  • fi
  • file=$1
  • if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
  • return 0
  • else
  • return 1
  • fi
  • }
  • # 获取脚本所在目录的上层目录
  • base_dir=$(dirname $0)/..
  • # 检测并设置SCALA_VERSION
  • if [ -z "$SCALA_VERSION" ]; then
  • SCALA_VERSION=2.10.6
  • fi
  • # 检测并设置SCALA_BINARY_VERSION
  • if [ -z "$SCALA_BINARY_VERSION" ]; then
  • SCALA_BINARY_VERSION=2.10
  • fi
  • # run ./gradlew copyDependantLibs to get all dependant jars in a local dir
  • # 检测$base_dir下的多个目录,根据should_include_file函数设置CLASSPATH
  • shopt -s nullglob
  • for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
  • do
  • if [ -z "$CLASSPATH" ] ; then
  • CLASSPATH="$dir/*"
  • else
  • CLASSPATH="$CLASSPATH:$dir/*"
  • fi
  • done
  • for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
  • do
  • CLASSPATH="$CLASSPATH":"$file"
  • done
  • for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
  • do
  • CLASSPATH="$CLASSPATH:$dir/*"
  • done
  • for cc_pkg in "api" "runtime" "file" "json" "tools"
  • do
  • for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
  • CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
  • fi
  • done
  • # classpath addition for release
  • for file in "$base_dir"/libs/*;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
  • do
  • if should_include_file "$file"; then
  • CLASSPATH="$CLASSPATH":"$file"
  • fi
  • done
  • shopt -u nullglob
  • # JMX settings
  • # JMX的相关设置
  • if [ -z "$KAFKA_JMX_OPTS" ]; then
  • KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
  • fi
  • # JMX port to use
  • # 设置JMX的端口
  • if [ $JMX_PORT ]; then
  • KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
  • fi
  • # Log directory to use
  • # 指定存放日志文件和索引文件的目录,默认是$KAFKA_HOME/logs
  • if [ "x$LOG_DIR" = "x" ]; then
  • LOG_DIR="$base_dir/logs"
  • fi
  • # Log4j settings
  • # Log4j的相关设置
  • if [ -z "$KAFKA_LOG4J_OPTS" ]; then
  • # Log to console. This is a tool.
  • KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties"
  • else
  • # create logs directory
  • if [ ! -d "$LOG_DIR" ]; then
  • mkdir -p "$LOG_DIR"
  • fi
  • fi
  • KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
  • # Generic jvm settings you want to add
  • if [ -z "$KAFKA_OPTS" ]; then
  • KAFKA_OPTS=""
  • fi
  • # Set Debug options if enabled
  • # 检测是否以Debug模式启动
  • if [ "x$KAFKA_DEBUG" != "x" ]; then
  • # Use default ports
  • DEFAULT_JAVA_DEBUG_PORT="5005"
  • if [ -z "$JAVA_DEBUG_PORT" ]; then
  • JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
  • fi
  • # Use the defaults if JAVA_DEBUG_OPTS was not set
  • DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
  • if [ -z "$JAVA_DEBUG_OPTS" ]; then
  • JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
  • fi
  • echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
  • KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
  • fi
  • # Which java to use
  • # 检测JAVA_HOME环境变量
  • if [ -z "$JAVA_HOME" ]; then
  • JAVA="java"
  • else
  • JAVA="$JAVA_HOME/bin/java"
  • fi
  • # Memory options
  • # 配置JVM内存
  • if [ -z "$KAFKA_HEAP_OPTS" ]; then
  • KAFKA_HEAP_OPTS="-Xmx256M"
  • fi
  • # JVM performance options
  • # 对JVM进行一些优化配置,默认使用G1垃圾回收器
  • if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  • KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
  • fi
  • # 处理参数name、loggc、daemon三个参数
  • while [ $# -gt 0 ]; do
  • COMMAND=$1
  • case $COMMAND in
  • -name)
  • DAEMON_NAME=$2
  • CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
  • shift 2
  • ;;
  • -loggc)
  • if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
  • GC_LOG_ENABLED="true"
  • fi
  • shift
  • ;;
  • -daemon)
  • DAEMON_MODE="true"
  • shift
  • ;;
  • *)
  • break
  • ;;
  • esac
  • done
  • # GC options
  • # 调整JVM GC相关的参数
  • GC_FILE_SUFFIX='-gc.log'
  • GC_LOG_FILE_NAME=''
  • if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
  • GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
  • KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
  • fi
  • # Launch mode
  • # 根据$DAEMON_MODE的值,决定是否后台启动
  • # 这里就是主要的执行业务代码了,会使用java命令执行传入的参数中携带的主类的main方法
  • if [ "x$DAEMON_MODE" = "xtrue" ]; then
  • nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
  • else
  • exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
  • fi

kafka-run-class.sh脚本的最后会使用java命令执行拼接了一系列参数,以及传入的参数中携带的主类的main(...)方法。以kafka-server-start.sh而言,则是执行了kafka.Kafka类的main(...)方法。

3. kafka-server-stop.sh脚本

kafka-server-stop.sh脚本的实现则非常简单,它会通过ps ax这Shell命令加上各类过滤获得Kafka的PID,直接使用kill命令将其杀死,源码如下:

kafka-server-stop.sh
  • #!/bin/sh
  • PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
  • if [ -z "$PIDS" ]; then
  • echo "No kafka server to stop"
  • exit 1
  • else
  • kill -s TERM $PIDS
  • fi

4. kafka-topics.sh脚本

kafka-topics.sh脚本主要负责Topic相关的操作。它的具体实现是通过上面分析的kafka-run-class.sh来直接调用kafka.admin.TopicCommand类,并根据参数执行指定的功能。

TopicCommand类的main(...)方法是该脚本的入口函数,其中会使用joptsimple命令行解释器解释传入的参数,之后按照参数执行指定的行为。源码如下:

kafka-topics.sh
  • #!/bin/bash
  • exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

kafka.admin.TopicCommand类的main(...)方法源码如下:

kafka.admin.TopicCommand#main
  • def main(args: Array[String]): Unit = {
  • // 解析参数,TopicCommandOptions支持list、describe、create、alter和delete五种操作
  • val opts = new TopicCommandOptions(args)
  • // 检查参数长度
  • if(args.length == 0)
  • CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
  • // should have exactly one action
  • // 获取对应的操作
  • val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
  • if(actions != 1)
  • CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
  • // 检查参数
  • opts.checkArgs()
  • // 获取Zookeeper连接
  • val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
  • 30000,
  • 30000,
  • JaasUtils.isZkSecurityEnabled())
  • var exitCode = 0
  • try {
  • if(opts.options.has(opts.createOpt)) // --create参数,创建Topic
  • createTopic(zkUtils, opts)
  • else if(opts.options.has(opts.alterOpt)) // --alter参数,修改Topic
  • alterTopic(zkUtils, opts)
  • else if(opts.options.has(opts.listOpt)) // --list参数,列出Topic
  • listTopics(zkUtils, opts)
  • else if(opts.options.has(opts.describeOpt)) // --describe参数,查询Topic详细信息
  • describeTopic(zkUtils, opts)
  • else if(opts.options.has(opts.deleteOpt)) // --delete参数,删除Topic
  • deleteTopic(zkUtils, opts)
  • } catch {
  • case e: Throwable =>
  • println("Error while executing topic command : " + e.getMessage)
  • error(Utils.stackTrace(e))
  • exitCode = 1
  • } finally {
  • zkUtils.close()
  • System.exit(exitCode)
  • }
  • }

从该方法的源码可以看出,对Topic的各类操作其实对应于几个方法分别实现的,下面就来介绍这几种方法的具体实现。

4.1. Topic的创建

创建Topic由TopicCommand的createTopic(...)方法实现,源码如下:

kafka.admin.TopicCommand#createTopic
  • def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  • // 获取--topic参数
  • val topic = opts.options.valueOf(opts.topicOpt)
  • // 将--config参数解析成Properties对象
  • val configs = parseTopicConfigsToBeAdded(opts)
  • // 读取--if-not-exists参数
  • val ifNotExists = if (opts.options.has(opts.ifNotExistsOpt)) true else false
  • // 检测Topic名称中是否包含"."或"_"字符,若包含则输出警告信息
  • if (Topic.hasCollisionChars(topic))
  • println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
  • try {
  • // 检测是否有--replica-assignment参数
  • if (opts.options.has(opts.replicaAssignmentOpt)) {
  • /**
  • * --replica-assignment参数的格式类似于:0:1:2,3:4:5,6:7:8,含义如下:
  • * 1. 编号为0的分区分配在Broker-0、Broker-1和Broker-2上;
  • * 2. 编号为1的分区分配在Broker-3、Broker-4和Broker-5上;
  • * 3. 编号为2的分区分配在Broker-6、Broker-7和Broker-8上;
  • *
  • * 这里将--replica-assignment参数内容解析成Map[Int, Seq[Int]]格式,
  • * 其key为分区的编号,value是其副本所分配的BrokerId
  • */
  • val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
  • // 检测max.message.bytes配置参数并给出提示
  • warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
  • // 对Topic名称和副本分配结果进行一系列的检测,并写入Zookeeper中
  • AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
  • } else {
  • // 如果进行副本自动分配,必须指定--partitions和--replication-factor参数
  • CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
  • // 获取--partitions参数的值
  • val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
  • // 获取--replication-factor参数的值
  • val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
  • // 检测max.message.bytes配置参数并给出提示
  • warnOnMaxMessagesChange(configs, replicas)
  • // 根据--disable-rack-aware参数决定分配副本时是否考虑机架信息
  • val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
  • else RackAwareMode.Enforced
  • // 自动分配副本,并写入Zookeeper
  • AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
  • }
  • println("Created topic \"%s\".".format(topic))
  • } catch {
  • case e: TopicExistsException => if (!ifNotExists) throw e
  • }
  • }

可见,createTopic(...)方法主要是检测和提取参数,并根据自动分区分配副本或手动分配分区副本来讲主题的创建信息写入Zookeeper中。

其中,AdminUtils的createOrUpdateTopicPartitionAssignmentPathInZK(...)方法处理手动分配分区副本的情况,它会对Topic名称和副本分配结果进行一系列的检测,并写入Zookeeper中,源码如下:

kafka.admin.AdminUtils
  • // 创建并更新Zookeeper中的主题分区副本分配信息
  • def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
  • topic: String,
  • partitionReplicaAssignment: Map[Int, Seq[Int]],
  • config: Properties = new Properties,
  • update: Boolean = false) {
  • // validate arguments
  • // 检测Topic名称是否符合要求
  • Topic.validate(topic)
  • // --replica-assignment参数指定的每个分区的副本数应该相同
  • require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
  • // 得到/brokers/topics/[topic_name]主题路径
  • val topicPath = getTopicPath(topic)
  • if (!update) {
  • if (zkUtils.zkClient.exists(topicPath)) // Topic已存在
  • throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
  • else if (Topic.hasCollisionChars(topic)) { // Topic名称存在"."或"_"
  • // 获取所有的Topic名称集合,打印提示出可能与新建Topic名称产生冲突的Topic
  • val allTopics = zkUtils.getAllTopics()
  • val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
  • if (collidingTopics.nonEmpty) {
  • throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
  • }
  • }
  • }
  • // 日志打印
  • partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
  • // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
  • if (!update) {
  • // write out the config if there is any, this isn't transactional with the partition assignments
  • // 验证配置
  • LogConfig.validate(config)
  • // 写入配置到Zookeeper
  • writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
  • }
  • // create the partition assignment
  • // 创建分区分配
  • writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
  • }
  • // 写入主题分区副本分配信息到Zookeeper
  • private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
  • try {
  • // 得到/brokers/topics/[topic_name]路径
  • val zkPath = getTopicPath(topic)
  • // 格式化副本分配信息为JSON字符串
  • val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
  • if (!update) {
  • info("Topic creation " + jsonPartitionData.toString)
  • // 创建持久节点
  • zkUtils.createPersistentPath(zkPath, jsonPartitionData)
  • } else {
  • // 更新持久节点
  • info("Topic update " + jsonPartitionData.toString)
  • zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
  • }
  • debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
  • } catch {
  • case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
  • case e2: Throwable => throw new AdminOperationException(e2.toString)
  • }
  • }

AdminUtils的createTopic(...)方法则实现了自动分配副本的功能,源码如下:

kafka.admin.AdminUtils#createTopic
  • def createTopic(zkUtils: ZkUtils,
  • topic: String,
  • partitions: Int,
  • replicationFactor: Int,
  • topicConfig: Properties = new Properties,
  • rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  • // 获取Broker信息
  • val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
  • // 根据Broker信息和副本分配信息进行自动分配
  • val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
  • // 创建并更新Zookeeper中的主题分区副本分配信息
  • AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
  • }

createTopic(...)方法会获取所有Broker的信息,然后使用AdminUtils的assignReplicasToBrokers(...)方法进行副本的自动分配,最终会通过上面讲解的createOrUpdateTopicPartitionAssignmentPathInZK(...)方法将分配结果写入Zookeeper。AdminUtils的assignReplicasToBrokers(...)方法的源码如下:

  • def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
  • nPartitions: Int,
  • replicationFactor: Int,
  • fixedStartIndex: Int = -1,
  • startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
  • // 检查参数
  • if (nPartitions <= 0)
  • throw new AdminOperationException("number of partitions must be larger than 0")
  • if (replicationFactor <= 0)
  • throw new AdminOperationException("replication factor must be larger than 0")
  • if (replicationFactor > brokerMetadatas.size)
  • throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
  • if (brokerMetadatas.forall(_.rack.isEmpty))
  • // 不需要机架感知的分配
  • assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
  • startPartitionId)
  • else {
  • if (brokerMetadatas.exists(_.rack.isEmpty))
  • throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
  • // 需要机架感知的分配
  • assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
  • startPartitionId)
  • }
  • }

对于主题副本的自动分配,需要分别对是否考虑机架感知来区别处理,不需要机架感知的分配时使用assignReplicasToBrokersRackUnaware(...)方法,需要机架感知则使用assignReplicasToBrokersRackAware(...)方法。其中assignReplicasToBrokersRackUnaware(...)方法的源码如下:

kafka.admin.AdminUtils
  • // 不需要考虑机架感知的分配
  • private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
  • replicationFactor: Int,
  • brokerList: Seq[Int],
  • fixedStartIndex: Int,
  • startPartitionId: Int): Map[Int, Seq[Int]] = {
  • // 用于记录副本分配结果
  • val ret = mutable.Map[Int, Seq[Int]]()
  • val brokerArray = brokerList.toArray
  • // 如果没有指定起始的Broker Index,则随机选择一个起始Broker进行分配
  • val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  • // 选择起始分区
  • var currentPartitionId = math.max(0, startPartitionId)
  • // nextReplicaShift指定了副本的间隔,目的是为了更均匀地将副本分配到不同的Broker上
  • var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  • // 遍历次数为分区数
  • for (_ <- 0 until nPartitions) {
  • if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
  • // 递增nextReplicaShift
  • nextReplicaShift += 1
  • // 将"优先副本"分配到startIndex指定的Broker上
  • val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
  • // 记录"优先副本"的分配结果
  • val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
  • for (j <- 0 until replicationFactor - 1) // 分配当前分区的其他副本
  • replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
  • ret.put(currentPartitionId, replicaBuffer)
  • currentPartitionId += 1 // 分配下一个分区
  • }
  • ret
  • }
  • private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  • val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  • (firstReplicaIndex + shift) % nBrokers
  • }

无需考虑机架感知的分区分配大致流程如下:

  1. 如果没有指定起始Broker索引,则随机选择一个起始Broker索引,随机值需要在Broker总个数内。
  2. 如果没有指定起始Partition ID,则以ID为0的分区当前起始分区。
  3. 如果指定了起始Broker索引,以起始Broker索引作为副本之间的间隔大小,否则随机选择一个值作为副本之间的间隔大小,随机值需要在Broker总个数内。
  4. 遍历所有分区,分配每个分区的副本,流程如下:
    1. 如果当前分区的ID与Broker数量相同,则对副本之间的间隔值自增1。
    2. 先将“优先副本”分配到“当前分区的ID值 + 起始Broker索引值”对应的Broker上。
    3. 然后从“优先副本”所在的Broker索引开始,间隔分配剩余的副本,间隔大小为“1 + (第3步定义的间隔值 + 副本的索引)”。
    4. 当前分区ID值自增,继续执行第4步,为下一个分区做副本分配。

当需要进行机架感知时,通过assignReplicasToBrokersRackAware(...)方法实现副本分配,它尽量将每个分区的副本均匀地分配到不同的机架上,如果每个机架上已经有了此分区的副本,则尽量均匀地分配到每个Broker上,源码如下:

kafka.admin.AdminUtils
  • private def assignReplicasToBrokersRackAware(nPartitions: Int,
  • replicationFactor: Int,
  • brokerMetadatas: Seq[BrokerMetadata],
  • fixedStartIndex: Int,
  • startPartitionId: Int): Map[Int, Seq[Int]] = {
  • // 对机架信息进行转换,得到的字典中,键是Broker的ID,值为所在的机架名称
  • val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
  • id -> rack
  • }.toMap
  • // 统计机架个数
  • val numRacks = brokerRackMap.values.toSet.size
  • // 基于机架信息生产一个Broker列表,轮询每个机架上的Broker,不同机架上的Broker交替出现
  • val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
  • // 所有机架上的Broker数量
  • val numBrokers = arrangedBrokerList.size
  • // 用于记录副本分配结果
  • val ret = mutable.Map[Int, Seq[Int]]()
  • // 选择起始Broker进行分配
  • val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  • // 选择起始分区
  • var currentPartitionId = math.max(0, startPartitionId)
  • // 指定副本的间隔
  • var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  • // 遍历所有分区
  • for (_ <- 0 until nPartitions) {
  • if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
  • // 递增nextReplicaShift
  • nextReplicaShift += 1
  • // 计算"优先副本"所在的Broker
  • val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
  • val leader = arrangedBrokerList(firstReplicaIndex)
  • // 记录"优先副本"所在的Broker
  • val replicaBuffer = mutable.ArrayBuffer(leader)
  • // 记录以及分配了当前分区的副本的机架信息
  • val racksWithReplicas = mutable.Set(brokerRackMap(leader))
  • // 记录已经分配了当前分区的副本的Broker信息
  • val brokersWithReplicas = mutable.Set(leader)
  • var k = 0
  • // 遍历分配剩余的副本
  • for (_ <- 0 until replicationFactor - 1) {
  • var done = false
  • while (!done) {
  • // 通过replicaIndex()方法计算当前副本所在的Broker
  • val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
  • val rack = brokerRackMap(broker)
  • // Skip this broker if
  • // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
  • // that do not have any replica, or
  • // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
  • /**
  • * 检测是否跳过此Broker,满足以下之一就跳过:
  • * 1. 当前机架上已经分配过其他副本,而且存在机架还未 分配副本;
  • * 2. 当前Broker上已经分配过其他副本,而且存在其他Broker还未分配副本。
  • */
  • if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
  • && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
  • // 记录分配结果
  • replicaBuffer += broker
  • // 记录此机架已经分配了当前分区的副本
  • racksWithReplicas += rack
  • // 记录此Broker已经分配了当前分区的副本
  • brokersWithReplicas += broker
  • // 标识此副本的分配完成
  • done = true
  • }
  • k += 1
  • }
  • }
  • ret.put(currentPartitionId, replicaBuffer)
  • currentPartitionId += 1
  • }
  • ret
  • }
  • /**
  • * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
  • * this is the rack and its brokers:
  • *
  • * rack1: 0, 1, 2
  • * rack2: 3, 4, 5
  • * rack3: 6, 7, 8
  • *
  • * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
  • *
  • * This is essential to make sure that the assignReplicasToBrokers API can use such list and
  • * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
  • * distribution of leader and replica counts on each broker and that replicas are
  • * distributed to all racks.
  • */
  • private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
  • // 得到 机架 -> 该机架上的Broker的ID集合的迭代器
  • val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
  • (rack, brokers.toIterator)
  • }
  • // 对机架进行排序
  • val racks = brokersIteratorByRack.keys.toArray.sorted
  • // 定义结果数组
  • val result = new mutable.ArrayBuffer[Int]
  • // 机架索引
  • var rackIndex = 0
  • /**
  • * brokerRackMap的大小即为Broker的数量。
  • * 这里的操作实现如下:
  • * 因为之前排过序了,所以brokersIteratorByRack中机架对应的BrokerID是有序的;
  • * 按照Broker的数量为遍历次数进行循环,轮流取每个机架上的Broker中的一个。
  • * 例如,有以下Broker分配信息:
  • * rack1: 0, 1, 2
  • * rack2: 3, 4, 5
  • * rack3: 6, 7, 8
  • * 一共9个Broker,那么会遍历9次:
  • * 第1次:取rack1,取rack1上的Broker-0
  • * 第2次:取rack2,取rack1上的Broker-3
  • * 第3次:取rack3,取rack1上的Broker-6
  • * 第4次:取rack1,取rack2上的Broker-1
  • * 第5次:取rack2,取rack2上的Broker-5
  • * 第6次:取rack3,取rack2上的Broker-7
  • * 第7次:取rack1,取rack3上的Broker-2
  • * 第8次:取rack2,取rack3上的Broker-5
  • * 第9次:取rack3,取rack3上的Broker-8
  • *
  • * 最终得到的Broker ID序列为:
  • * 0, 3, 6, 1, 4, 7, 2, 5, 8
  • */
  • while (result.size < brokerRackMap.size) {
  • // 获取机架对应的BrokerID集合迭代器
  • val rackIterator = brokersIteratorByRack(racks(rackIndex))
  • // 取其中一个Broker
  • if (rackIterator.hasNext)
  • result += rackIterator.next()
  • // 机架索引自增
  • rackIndex = (rackIndex + 1) % racks.length
  • }
  • result
  • }

需要考虑机架感知的分区分配大致流程如下:

  1. 基于机架信息,以轮询机架集合以及每个机架上的Broker ID集合得到均匀分布的Broker ID列表。
  2. 如果没有指定起始Broker索引,则随机选择一个起始Broker索引,随机值需要在Broker总个数内。
  3. 如果没有指定起始Partition ID,则以ID为0的分区当前起始分区。
  4. 如果指定了起始Broker索引,以起始Broker索引作为副本之间的间隔大小,否则随机选择一个值作为副本之间的间隔大小,随机值需要在Broker总个数内。
  5. 遍历所有分区,分配每个分区的副本,流程如下:
    1. 如果当前分区的ID与Broker数量相同,则对副本之间的间隔值自增1。
    2. 以“当前分区的ID值 + 起始Broker索引值”计算对应的索引值,以该索引值获取第1步得到Broker ID列表中对应的Broker作为“优先副本”所在的Broker。并记录分配了“优先副本”的机架和Broker。
    3. 从“优先副本”所在的Broker索引开始,间隔分配剩余的副本,间隔大小为“1 + (第4步定义的间隔值 * 机架总数 + 副本的索引)”。得到对应的Broker后,如果当前Broker上已经分配过其他副本,而且存在其他Broker还未分配副本,或者该Broker所在的机架上已经分配过其他副本,而且存在机架还未分配副本,就跳过这次分配,重新执行当前步骤进行分配。否则分配副本在该Broker,并记录此Broker和此机架已经分配了当前分区的副本。
    4. 当前分区ID值自增,继续执行第5步,为下一个分区做副本分配。

经过上述两种方式分配的结果,最终都会通过前面讲解的createOrUpdateTopicPartitionAssignmentPathInZK(...)方法写入到Zookeeper中保存。

注:除了使用kafka-topics脚本以外,在KafkaApis中需要创建Topic时也会调用AdminUtils的createTopic(...)方法。

4.2. Topic的修改

修改Topic由TopicCommand的createTopic(...)方法实现,源码如下:

kafka.admin.TopicCommand#alterTopic
  • def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  • // 从Zookeeper中获取与--topic参数正则匹配的Topic集合
  • val topics = getTopics(zkUtils, opts)
  • // 读取--if-exists参数
  • val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
  • if (topics.length == 0 && !ifExists) {
  • throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
  • opts.options.valueOf(opts.zkConnectOpt)))
  • }
  • topics.foreach { topic =>
  • // 修改Topic配置项信息的功能路径为/config/topics/[topic_name]
  • val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
  • // 判断是否包含--config配置或--delete-config配置
  • if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
  • println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
  • println(" Going forward, please use kafka-configs.sh for this functionality")
  • // 得到新添加的和将删除的配置并进行更新
  • val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
  • val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
  • // compile the final set of configs
  • configs.putAll(configsToBeAdded)
  • configsToBeDeleted.foreach(config => configs.remove(config))
  • // 修改Zookeeper中的主题配置信息
  • AdminUtils.changeTopicConfig(zkUtils, topic, configs)
  • println("Updated config for topic \"%s\".".format(topic))
  • }
  • // 副本重新分配的功能
  • // 检测是否包含--partitions参数
  • if(opts.options.has(opts.partitionsOpt)) {
  • // 不可修改__consumer_offsets主题,会抛出异常
  • if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
  • throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
  • }
  • println("WARNING: If partitions are increased for a topic that has a key, the partition " +
  • "logic or ordering of the messages will be affected")
  • // 获取--partitions参数
  • val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
  • // 获取--replica-assignment参数
  • val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
  • // 完成分区数量的增加以及副本分配
  • AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
  • println("Adding partitions succeeded!")
  • }
  • }
  • }

其中addPartitions(...)方法会根据原有分区的分配情况确定副本个数,根据是否指定--replica-assignment参数决定新增分区是否进行自动副本分配,最后将原有分区和新增的Partition的副本分配结果合并后写入ZooKeeper:

kafka.admin.AdminUtils#addPartitions
  • def addPartitions(zkUtils: ZkUtils,
  • topic: String,
  • numPartitions: Int = 1,
  • replicaAssignmentStr: String = "",
  • checkBrokerAvailable: Boolean = true,
  • rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  • // 从Zookeeper获取此Topic当前的副本分配情况
  • val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
  • if (existingPartitionsReplicaList.size == 0)
  • throw new AdminOperationException("The topic %s does not exist".format(topic))
  • // 获取ID为0的分区的副本分配情况
  • val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
  • case None => throw new AdminOperationException("the topic does not have partition with id 0, it should never happen")
  • case Some(headPartitionReplica) => headPartitionReplica._2
  • }
  • // 获取ID未0的分区的副本数量
  • val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
  • if (partitionsToAdd <= 0)
  • throw new AdminOperationException("The number of partitions for a topic can only be increased")
  • // create the new partition replication list
  • // 只能增加分区数量,如果指定的分区数量小于当前分区数,则抛出异常
  • val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
  • val newPartitionReplicaList =
  • if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
  • // 确定startIndex
  • val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
  • // 对新增分区进行副本自动分配,注意fixedStartIndex和startPartitionId参数的取值
  • AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
  • startIndex, existingPartitionsReplicaList.size)
  • }
  • else
  • // 解析replica-assignment参数,其中会进行一系列有效性检测
  • getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
  • existingPartitionsReplicaList.size, checkBrokerAvailable)
  • // check if manual assignment has the right replication factor
  • // 检测新增Partition的副本数是否正常
  • val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
  • if (unmatchedRepFactorList.size != 0)
  • throw new AdminOperationException("The replication factor in manual replication assignment " +
  • " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
  • info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
  • // 将原有分区的新增分区的副本分配整理为集合
  • val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
  • // add the new list
  • // 将最终的副本分配结果写入Zookeeper
  • partitionReplicaList ++= newPartitionReplicaList
  • // 该方法最后一个参数表示只更新副本分配情况
  • AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
  • }

4.3. Topic的列举

列出所有Topic的名称由listTopic()方法完成,源码如下:

kafka.admin.TopicCommand
  • def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  • // 从Zookeeper中获取Topic列表
  • val topics = getTopics(zkUtils, opts)
  • for(topic <- topics) {
  • // 打印
  • if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
  • // 如果标记为删除会打印" - marked for deletion"
  • println("%s - marked for deletion".format(topic))
  • } else {
  • // 否则只打印主题名称
  • println(topic)
  • }
  • }
  • }
  • private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
  • // 从从Zookeeper的/brokers/topics路径读取所有主题并按名排序
  • val allTopics = zkUtils.getAllTopics().sorted
  • // 判断是否有--topic参数,如果有可能需要通过正则匹配进行过滤
  • if (opts.options.has(opts.topicOpt)) {
  • val topicsSpec = opts.options.valueOf(opts.topicOpt)
  • // 正则白名单
  • val topicsFilter = new Whitelist(topicsSpec)
  • // 进行过滤
  • allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics = false))
  • } else
  • allTopics
  • }

4.4. Topic的查看

--describe参数用于查看Topic的详细信息,该功能由describeTopic(...)方法完成,该方法比较简单,主要是从Zookeeper中获取数据并打印,源码如下:

kafka.admin.TopicCommand#describeTopic
  • def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  • // 从Zookeeper读取指定Topic的列表
  • val topics = getTopics(zkUtils, opts)
  • // 获取--under-replicated-partitions参数
  • val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
  • // 获取--unavailable-partitions参数
  • val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
  • // 获取--topics-with-overrides参数
  • val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
  • // 获取所有可用的Broker
  • val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
  • // 遍历需要查看的Topic
  • for (topic <- topics) {
  • // 从Zookeeper中获取每个Topic的分区分配
  • zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
  • case Some(topicPartitionAssignment) => // 能够获取到
  • // 处理配置
  • val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
  • val describePartitions: Boolean = !reportOverriddenConfigs
  • // 对分区分配进行排序
  • val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
  • if (describeConfigs) {
  • // 获取Topic的配置信息
  • val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
  • // 没有设置--topics-with-overrides参数,且--config配置为空
  • if (!reportOverriddenConfigs || configs.size() != 0) {
  • // 分区数
  • val numPartitions = topicPartitionAssignment.size
  • // 副本因子
  • val replicationFactor = topicPartitionAssignment.head._2.size
  • // 打印主题信息
  • println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
  • .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
  • }
  • }
  • // 判断是否打印分区信息
  • if (describePartitions) {
  • // 遍历已排序的分区
  • for ((partitionId, assignedReplicas) <- sortedPartitions) {
  • // ISR副本集合
  • val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
  • // Leader副本ID
  • val leader = zkUtils.getLeaderForPartition(topic, partitionId)
  • if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
  • (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
  • (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
  • // 打印信息
  • print("\tTopic: " + topic)
  • print("\tPartition: " + partitionId)
  • print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
  • print("\tReplicas: " + assignedReplicas.mkString(","))
  • println("\tIsr: " + inSyncReplicas.mkString(","))
  • }
  • }
  • }
  • case None => // 主题不存在
  • println("Topic " + topic + " doesn't exist!")
  • }
  • }
  • }

4.5. Topci的删除

Topic的删除由deleteTopic(...)方法负责处理,它将待删除的Topic名称写入到Zookeeper的/admin/delete_topics路径下,这会触发DeleteTopicsListener将待删除Topic交由TopicDeletionManager处理。源码比较简单:

kafka.admin.TopicCommand#deleteTopic
  • def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  • // 获取指定的主题列表
  • val topics = getTopics(zkUtils, opts)
  • // --if-exists参数
  • val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
  • // 根据参数进行检查
  • if (topics.length == 0 && !ifExists) {
  • throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
  • opts.options.valueOf(opts.zkConnectOpt)))
  • }
  • // 遍历指定的主题
  • topics.foreach { topic =>
  • try {
  • if (Topic.isInternal(topic)) { // 不可删除内部分区
  • throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
  • } else {
  • /**
  • * 将待删除的Topic名称写入到Zookeeper的/admin/delete_topics路径下
  • * 这将触发DeleteTopicsListener将待删除Topic交由TopicDeletionManager处理
  • */
  • zkUtils.createPersistentPath(getDeleteTopicPath(topic))
  • println("Topic %s is marked for deletion.".format(topic))
  • println("Note: This will have no impact if delete.topic.enable is not set to true.")
  • }
  • } catch {
  • case e: ZkNodeExistsException =>
  • println("Topic %s is already marked for deletion.".format(topic))
  • case e: AdminOperationException =>
  • throw e
  • case e: Throwable =>
  • throw new AdminOperationException("Error while deleting topic %s".format(topic))
  • }
  • }
  • }

5. kafka-preferred-replica-election.sh脚本

前面讲到的PreferredReplicaElectionListener监听器会监听ZooKeeper中的/admin/preferred_replica_election节点,负责对指定的分区进行“优先副本”选举。而这里“指定的分区”可以通过kafka-preferred-replica-election脚本写入到ZooKeeper,该脚本是通过调用PreferredReplicaLeaderElectionCommand实现的,其--path-to-json-file参数指定一个JSON格式的输入文件,在其中指定了需要进行“优先副本”选举的分区,如果未指定输入文件,则认为所有分区都需要进行“优先副本”选举操作。--path-to-json-file参数指定的JSON格式的输入文件示例如下:

  • {
  • "partitions":[
  • {
  • "topic": "topic-1", "partition": 1
  • },
  • {
  • "topic": "topic-2", "partition": 2
  • }
  • ]
  • }

kafka-preferred-replica-election.sh脚本的内容也比较简单,它其实执行了kafka.admin.PreferredReplicaLeaderElectionCommand类的main(...)方法:

  • #!/bin/bash
  • exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand "$@"

PreferredReplicaLeaderElectionCommand类的main(...)方法源码如下:

kafka.admin.PreferredReplicaLeaderElectionCommand#main
  • def main(args: Array[String]): Unit = {
  • val parser = new OptionParser
  • val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
  • "for which preferred replica leader election should be done, in the following format - \n" +
  • "{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
  • "Defaults to all existing partitions")
  • .withRequiredArg
  • .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
  • .ofType(classOf[String])
  • val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
  • "form host:port. Multiple URLS can be given to allow fail-over.")
  • .withRequiredArg
  • .describedAs("urls")
  • .ofType(classOf[String])
  • if(args.length == 0)
  • CommandLineUtils.printUsageAndDie(parser, "This tool causes leadership for each partition to be transferred back to the 'preferred replica'," +
  • " it can be used to balance leadership among the servers.")
  • // 解析及检测--path-to-json-file和--zookeeper参数
  • val options = parser.parse(args : _*)
  • CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
  • // 创建Zookeeper连接
  • val zkConnect = options.valueOf(zkConnectOpt)
  • var zkClient: ZkClient = null
  • var zkUtils: ZkUtils = null
  • try {
  • zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
  • zkUtils = ZkUtils(zkConnect,
  • 30000,
  • 30000,
  • JaasUtils.isZkSecurityEnabled())
  • // 获取需要进行"优先副本"选举的分区集合
  • val partitionsForPreferredReplicaElection =
  • // 未指定--path-to-json-file参数则返回全部分区
  • if (!options.has(jsonFileOpt))
  • zkUtils.getAllPartitions()
  • else
  • // 解析--path-to-json-file参数指定的输入文件
  • parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
  • // 创建PreferredReplicaLeaderElectionCommand对象
  • val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
  • // 将指定的分区写入到Zookeeper的/admin/preferred_replica_election节点中
  • preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
  • println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
  • } catch {
  • case e: Throwable =>
  • println("Failed to start preferred replica election")
  • println(Utils.stackTrace(e))
  • } finally {
  • if (zkClient != null)
  • zkClient.close()
  • }
  • }

其中最后PreferredReplicaLeaderElectionCommand的moveLeaderToPreferredReplica()方法首先会检测指定的Topic是否包含指定的分区,之后会调用writePreferredReplicaElectionData()方法将需要进行“优先副本”选举的分区信息写入ZooKeeper:

kafka.admin.PreferredReplicaLeaderElectionCommand
  • def moveLeaderToPreferredReplica() = {
  • try {
  • // 检查指定的Topic是否包含指定的分区
  • val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition))
  • // 将信息写入Zookeeper
  • PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
  • } catch {
  • case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
  • }
  • }
  • def writePreferredReplicaElectionData(zkUtils: ZkUtils,
  • partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
  • // 得到/admin/preferred_replica_election节点的路径
  • val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
  • // 构造分区字典
  • val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
  • // 将数据转换为JSON格式
  • val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
  • try {
  • // 写入Zookeeper
  • zkUtils.createPersistentPath(zkPath, jsonData)
  • info("Created preferred replica election path with %s".format(jsonData))
  • } catch {
  • case nee: ZkNodeExistsException =>
  • val partitionsUndergoingPreferredReplicaElection =
  • PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
  • throw new AdminOperationException("Preferred replica leader election currently in progress for " +
  • "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
  • case e2: Throwable => throw new AdminOperationException(e2.toString)
  • }
  • }

最终会触发PreferredReplicaElectionListener监听器监听器以进行“优先副本”选举。

6. kafka-reassign-partitions脚本

kafka-reassign-partitions脚本用于触发分区的副本重新分配,具体工作其实是由PartitionsReassignedListener监听器实现的,它会监听ZooKeeper中的/admin/reassign_partitions节点,负责进行分区中副本的重新分配。kafka-reassign-partitions.sh脚本源码如下:

  • #!/bin/bash
  • exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

该脚本的实现很简单,其实是调用了kafka.admin.ReassignPartitionsCommand类的main(...)方法:

kafka.admin.ReassignPartitionsCommand#main
  • def main(args: Array[String]): Unit = {
  • // 解析参数
  • val opts = new ReassignPartitionsCommandOptions(args)
  • // should have exactly one action
  • // 获取--generate、--execute和--verify参数出现的个数,这三个参数只能出现其中一个
  • val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _)
  • if(actions != 1)
  • CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify")
  • // 检查参数
  • CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
  • // 创建Zookeeper连接
  • val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
  • val zkUtils = ZkUtils(zkConnect,
  • 30000,
  • 30000,
  • JaasUtils.isZkSecurityEnabled())
  • try {
  • if(opts.options.has(opts.verifyOpt))
  • // --verify,检测副本迁移是否结束
  • verifyAssignment(zkUtils, opts)
  • else if(opts.options.has(opts.generateOpt))
  • // --generate,输出副本迁移方案和当前副本的分配情况
  • generateAssignment(zkUtils, opts)
  • else if (opts.options.has(opts.executeOpt))
  • // --execute,执行指定的副本迁移
  • executeAssignment(zkUtils, opts)
  • } catch {
  • case e: Throwable =>
  • println("Partitions reassignment failed due to " + e.getMessage)
  • println(Utils.stackTrace(e))
  • } finally {
  • // 关闭Zookeeper连接
  • val zkClient = zkUtils.zkClient
  • if (zkClient != null)
  • zkClient.close()
  • }
  • }

由此可见,kafka-reassign-partitions脚本的主要功能有四个:

  1. 检测指定分区的副本迁移是否完成。
  2. 输出副本迁移的方案和当前副本的分配情况。
  3. 将副本迁移方案写入到ZooKeeper中,由PartitionsReassignedListener处理。

其中,verifyAssignment(...)主要负责检测副本迁移操作是否已完成。它首先读取并分析--reassignment-json-file参数指定的输入文件内容,之后调用checkIfReassignmentSucceeded()方法检测每个分区的迁移情况:

kafka.admin.ReassignPartitionsCommand
  • def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
  • // 检查--reassignment-json-file参数
  • if(!opts.options.has(opts.reassignmentJsonFileOpt))
  • CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
  • // 从--reassignment-json-file参数指定的JSON文件读取分配信息
  • val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
  • val jsonString = Utils.readFileAsString(jsonFile)
  • // 解析JSON数据,主要是检查是否有重复的键
  • val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString)
  • println("Status of partition reassignment:")
  • // 检测每个分区的迁移情况
  • val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
  • // 针对迁移情况作出打印信息
  • reassignedPartitionsStatus.foreach { partition =>
  • partition._2 match {
  • case ReassignmentCompleted =>
  • println("Reassignment of partition %s completed successfully".format(partition._1))
  • case ReassignmentFailed =>
  • println("Reassignment of partition %s failed".format(partition._1))
  • case ReassignmentInProgress =>
  • println("Reassignment of partition %s is still in progress".format(partition._1))
  • }
  • }
  • }
  • private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
  • :Map[TopicAndPartition, ReassignmentStatus] = {
  • // 从Zookeeper的/admin/reassign_partitions路径下获取正在执行副本重新分配的分区信息
  • val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
  • // 对每个分区都调用checkIfPartitionReassignmentSucceeded()方法检测是否迁移成功
  • partitionsToBeReassigned.map { topicAndPartition =>
  • (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
  • topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
  • }
  • }
  • def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
  • reassignedReplicas: Seq[Int],
  • partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
  • partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
  • // 获取分区中副本重新分配的模板,若已经分配完毕,则返回空
  • val newReplicas = partitionsToBeReassigned(topicAndPartition)
  • partitionsBeingReassigned.get(topicAndPartition) match {
  • // ZooKeeper中的/admin/reassign_partitions路径下依然有此分区信息,则表示正在进行副本重新分配
  • case Some(partition) => ReassignmentInProgress
  • case None =>
  • // 当前AR集合与迁移目标一致,即为重新分配成功
  • // check if the current replica assignment matches the expected one after reassignment
  • val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
  • if(assignedReplicas == newReplicas)
  • ReassignmentCompleted
  • else {
  • // 迁移已经完成,但当前AR集合与迁移目标不一致,表示迁移失败
  • println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
  • " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
  • ReassignmentFailed
  • }
  • }
  • }

generateAssignment(...)方法负责输出副本迁移方案以及当前分区的AR信息。它首先检测--topics-to-move-json-file参数和--broker-list参数是否都存在,不存在则异常结束;之后解析--topics-to-move-json-file参数和--broker-list参数,读取--topics-to-move-json-file参数指定的输入文件,读取--disable-rack-aware参数;最后调用generateAssignment(...)方法的重载获取副本迁移的目标以及当前分配的情况。generateAssignment(...)的代码如下:

kafka.admin.ReassignPartitionsCommand#generateAssignment
  • def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
  • // 获取待迁移的Topic
  • val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
  • // 检测待迁移Top次集合中是否存在重复的Topic,如果存在则抛出异常
  • val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
  • if (duplicateTopicsToReassign.nonEmpty)
  • throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
  • // 获取待迁移Topic的当前副本分配情况
  • val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
  • // 按照Topic名进行分组
  • val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
  • // 决定是否考虑机架感知
  • val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
  • // 获取--broker-list参数指定的Broker的信息,包括了机架信息
  • val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign))
  • // 记录副本迁移的目标
  • val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]()
  • // 遍历每个待迁移的Topic
  • groupedByTopic.foreach { case (topic, assignment) =>
  • val (_, replicas) = assignment.head
  • // 进行副本自动分配
  • val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
  • // 记录此Topic副本迁移后的结果
  • partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
  • (TopicAndPartition(topic, partition) -> replicas)
  • }
  • }
  • // 返回生产的副本迁移目标和当前副本分配的情况
  • (partitionsToBeReassigned, currentAssignment)
  • }

executeAssignment(...)方法负责将副本迁移方案写入到ZooKeeper中,由PartitionsReassignedListener处理。它首先检测--reassignment-json-file参数是否都存在,若不存在则异常结束;之后解析--reassignment-json-file参数并读取其指定的输入文件,得到副本迁移方案;最后调用executeAssignment(...)方法的重载实现将副本迁移方案写入ZooKeeper。executeAssignment(...)方法的具体实现如下:

kafka.admin.ReassignPartitionsCommand#executeAssignment
  • def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String) {
  • // 解析--reassignment-json-file参数指定的输入文件内容并检测是否为空
  • val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
  • if (partitionsToBeReassigned.isEmpty)
  • throw new AdminCommandFailedException("Partition reassignment data file is empty")
  • // 检测待迁移Topic是否存在重复
  • val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
  • if (duplicateReassignedPartitions.nonEmpty)
  • throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
  • // 检测待迁移分区的副本分配是否存在重复
  • val duplicateEntries= partitionsToBeReassigned
  • .map{ case(tp,replicas) => (tp, CoreUtils.duplicates(replicas))}
  • .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
  • if (duplicateEntries.nonEmpty) {
  • val duplicatesMsg = duplicateEntries
  • .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) }
  • .mkString(". ")
  • throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
  • }
  • // 创建ReassignPartitionsCommand
  • val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
  • // before starting assignment, output the current replica assignment to facilitate rollback
  • // 从Zookeeper中获取当前分区的副本分配情况并打印
  • val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
  • println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
  • .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
  • // reassignPartitions()方法会将迁移的目标写入到Zookeeper的/admin/reassign_partitions路径
  • // start the reassignment
  • if(reassignPartitionsCommand.reassignPartitions())
  • println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
  • else
  • println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
  • }
  • def reassignPartitions(): Boolean = {
  • try {
  • // 验证分区分区是否存在
  • val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
  • if(validPartitions.isEmpty) {
  • // 没有通过验证的分区,直接返回false
  • false
  • }
  • else {
  • // 构建json数据
  • val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
  • // 在Zookeeper/admin/reassign_partitions路径下写入数据
  • zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
  • // 返回true
  • true
  • }
  • } catch {
  • case ze: ZkNodeExistsException =>
  • val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  • throw new AdminCommandFailedException("Partition reassignment currently in " +
  • "progress for %s. Aborting operation".format(partitionsBeingReassigned))
  • case e: Throwable => error("Admin command failed", e); false
  • }
  • }

当迁移的目标信息写入到Zookeeper的/admin/reassign_partitions路径后,会触发PartitionsReassignedListener监听器继而进行分区重新分配。

7. kafka-console-producer.sh脚本

kafka-console-producer脚本是一个简易的控制台版本的生产者,可以通过在控制台中输入消息的键和值,然后由此脚本生成请求将消息追加到Kafka服务端。kafka-console-producer脚本的功能通过调用ConsoleProducer实现,其中同时支持新旧两个版本的生产者,这里主要分析新版本生产者的相关实现。kafka-console-producer.sh的源码如下:

  • #!/bin/bash
  • # 设置Kafka的JVM参数
  • if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  • export KAFKA_HEAP_OPTS="-Xmx512M"
  • fi
  • # 调用kafka.tools.ConsoleProducer的main()方法
  • exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

可见,该脚本执行了kafka.tools.ConsoleProducer的main(...)方法,源码如下:

kafka.tools.ConsoleProducer#main
  • def main(args: Array[String]) {
  • try {
  • // 读取命令行参数并进行解析
  • val config = new ProducerConfig(args)
  • // 创建LineMessageReader对象
  • val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
  • // 初始化LineMessageReader对象
  • reader.init(System.in, getReaderProps(config))
  • val producer =
  • if(config.useOldProducer) {
  • new OldProducer(getOldProducerProps(config))
  • } else {
  • // 新版本的Producer
  • new NewShinyProducer(getNewProducerProps(config))
  • }
  • // 添加JVM关闭钩子
  • Runtime.getRuntime.addShutdownHook(new Thread() {
  • override def run() {
  • // 关闭生产者
  • producer.close()
  • }
  • })
  • var message: ProducerRecord[Array[Byte], Array[Byte]] = null
  • do {
  • // 读取数据
  • message = reader.readMessage()
  • if (message != null)
  • // 发送消息
  • producer.send(message.topic, message.key, message.value)
  • } while (message != null)
  • } catch {
  • case e: joptsimple.OptionException =>
  • System.err.println(e.getMessage)
  • System.exit(1)
  • case e: Exception =>
  • e.printStackTrace
  • System.exit(1)
  • }
  • System.exit(0)
  • }

可见ConsoleProducer使用LineMessageReader读取控制台的输入,其中封装了在init()方法中传入的System.in输入流,LineMessageReader的readMessage(...)可以根据配置决定是否以及如何切分控制台输入的键和值,默认按照\t进行切分,并返回ProducerRecord:

kafka.tools.ConsoleProducer.LineMessageReader#readMessage
  • override def readMessage() = {
  • lineNumber += 1
  • // 读取数据,根据parse.key参数决定是否解析键
  • (reader.readLine(), parseKey) match {
  • case (null, _) => null
  • case (line, true) =>
  • // 分割消息为键和值
  • line.indexOf(keySeparator) match {
  • case -1 =>
  • // 没有解析到键,根据ignore.error来决定是否抛出异常
  • if (ignoreError) new ProducerRecord(topic, line.getBytes)
  • else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
  • case n =>
  • // 能解析到键,创建ProducerRecord
  • val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
  • new ProducerRecord(topic, line.substring(0, n).getBytes, value)
  • }
  • case (line, false) =>
  • // 直接创建ProducerRecord
  • new ProducerRecord(topic, line.getBytes)
  • }
  • }

NewShinyProducer则封装了KafkaProducer对象,并依赖KafkaProducer完成发送消息的功能:

kafka.producer.NewShinyProducer
  • @deprecated("This class has been deprecated and will be removed in a future release. " +
  • "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0")
  • class NewShinyProducer(producerProps: Properties) extends BaseProducer {
  • import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  • import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
  • // decide whether to send synchronously based on producer properties
  • // 根据producer.type来决定是异步发送还是同步发送
  • val sync = producerProps.getProperty("producer.type", "async").equals("sync")
  • val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
  • override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
  • val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
  • if(sync) { // 同步发送
  • this.producer.send(record).get()
  • } else { // 异步发送
  • this.producer.send(record,
  • new ErrorLoggingCallback(topic, key, value, false))
  • }
  • }
  • override def close() {
  • this.producer.close()
  • }
  • }

8. kafka-console-consumer.sh脚本

kafka-console-consumer脚本是一个简易的控制台Consumer,它的源码如下:

  • #!/bin/bash
  • if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  • export KAFKA_HEAP_OPTS="-Xmx512M"
  • fi
  • exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

可见该脚本的功能通过调用ConsoleConsumer的main(...)方法实现的:

kafka.tools.ConsoleConsumer
  • def main(args: Array[String]) {
  • // 读取配置为ConsumerConfig对象
  • val conf = new ConsumerConfig(args)
  • try {
  • // 调用run()方法
  • run(conf)
  • } catch {
  • case e: Throwable =>
  • error("Unknown error when running consumer: ", e)
  • System.exit(1);
  • }
  • }
  • def run(conf: ConsumerConfig) {
  • // 创建消费者
  • val consumer =
  • if (conf.useNewConsumer) { // 是否使用新消费者,通过参数--new-consumer判断
  • val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
  • // 创建NewShinyConsumer
  • new NewShinyConsumer(Option(conf.topicArg), Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs)
  • } else {
  • checkZk(conf)
  • new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
  • }
  • // 添加JVM关闭钩子方法
  • addShutdownHook(consumer, conf)
  • try {
  • // 从服务端获取消息并输出
  • process(conf.maxMessages, conf.formatter, consumer, conf.skipMessageOnError)
  • } finally {
  • // 清理Consumer
  • consumer.cleanup()
  • // 打印获取的消息总数
  • reportRecordCount()
  • // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
  • if (!conf.groupIdPassed) // 清理Zookeeper相关信息
  • ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id"))
  • shutdownLatch.countDown()
  • }
  • }

ConsoleConsumer同时支持新旧两个版本的消费者,这里分析NewShinyConsumer新消费者的实现,NewShinyConsumer中封装了KafkaConsumer对象,并依赖KafkaConsumer实现从服务端拉去消息的功能,具体实现如下:

kafka.consumer.NewShinyConsumer
  • class NewShinyConsumer(topic: Option[String], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
  • import org.apache.kafka.clients.consumer.KafkaConsumer
  • import scala.collection.JavaConversions._
  • // 创建KafkaConsumer
  • val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
  • // 订阅Topic
  • if (topic.isDefined) // 订阅指定的Topic,--topic参数
  • consumer.subscribe(List(topic.get))
  • else if (whitelist.isDefined) // 未指定Topic则订阅白名单中指定的Topic,--whitelist参数
  • consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
  • else // 否则抛出异常
  • throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
  • // 尝试从服务器拉取消息
  • var recordIter = consumer.poll(0).iterator
  • override def receive(): BaseConsumerRecord = {
  • if (!recordIter.hasNext) {
  • // 在上次拉取的消息处理完后,继续从服务端拉取消息
  • recordIter = consumer.poll(timeoutMs).iterator
  • // 吗没有更多消息
  • if (!recordIter.hasNext)
  • throw new ConsumerTimeoutException
  • }
  • // 将消息疯转改为BaseConsumerRecord返回
  • val record = recordIter.next
  • BaseConsumerRecord(record.topic,
  • record.partition,
  • record.offset,
  • record.timestamp,
  • record.timestampType,
  • record.key,
  • record.value)
  • }
  • override def stop() {
  • this.consumer.wakeup()
  • }
  • override def cleanup() {
  • this.consumer.close()
  • }
  • override def commit() {
  • this.consumer.commitSync()
  • }
  • }

ConsoleConsumer的process(...)方法中会调用NewShinyConsumer的receive()方法获取消息,之后调用MessageFormatter格式化器的writeTo()方法将消息按照指定格式输出,MessageFormatter有多个子类,可用于处理不同类型的消息以及实现不同的输出内容,通过--formatter参数指定全限定类名进行配置,子类的功能如下所述:

  • DefaultMessageFormatter:输出消息的键和值。默认使用该格式化器。
    LoggingMessageFormatter:封装了DefaultMessageFormatter,除了输出消息的键和值,还会输出日志信息。
  • NoOpMessageFormatter:空实现,不会输出输出任何内容。
  • ChecksumMessageFormatter:输出消息的校验码。
  • OffsetsMessageFormatter:主要在消费Offsets Topic这个内部Topic时使用,它会解析并输出记录Offset相关信息的消息。
  • GroupMetadataMessageFormatter:主要在消费Offsets Topic这个内部Topic时使用,它会解析并输出记录Consumer Group信息的消息。

9. kafka-consumer-groups.sh脚本

kafka-consumer-groups脚本的主要功能有三个:

  1. 查询当前所有Consumer Group;
  2. 获取某Consumer Group的详细信息;
  3. 删除某Consumer Group。

对于删除Consumer Group功能,旧版本Consumer中的Consumer Group相关信息保存在ZooKeeper中,该脚本只能删除旧版本的Consumer Group;而新版本消费者的Consumer Group信息记录在Offsets Topic这个内部Topic中,不能通过该脚本进行删除。该脚本源码如下:

  • #!/bin/bash
  • exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

从脚本源码可知,ConsumerGroupCommand的main(...)方法是kafka-consumer-groups.sh脚本的入口函数,具体实现如下:

kafka.admin.ConsumerGroupCommand#main
  • def main(args: Array[String]) {
  • // 获取参数并进行检查
  • val opts = new ConsumerGroupCommandOptions(args)
  • if (args.length == 0)
  • CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
  • // should have exactly one action
  • // --list参数、--describe参数、--delete参数只能出现一个
  • val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
  • if (actions != 1)
  • CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
  • opts.checkArgs()
  • val consumerGroupService = {
  • // 通过--new-consumer参数指定使用新版本还是旧版本的消费者
  • if (opts.options.has(opts.newConsumerOpt))
  • // 新版本消费者使用KafkaConsumerGroupService来实现
  • new KafkaConsumerGroupService(opts)
  • else
  • // 旧版本消费者使用ZkConsumerGroupService来实现
  • new ZkConsumerGroupService(opts)
  • }
  • try {
  • if (opts.options.has(opts.listOpt)) // --list
  • // 输出全部Consumer Group的ID
  • consumerGroupService.list()
  • else if (opts.options.has(opts.describeOpt)) // --describe
  • // 获取指定Consumer Group的描述信息
  • consumerGroupService.describe()
  • else if (opts.options.has(opts.deleteOpt)) { // --delete
  • // 删除指定的Consumer Group
  • consumerGroupService match {
  • case service: ZkConsumerGroupService => service.delete() // 旧版本可删除
  • // 新版本将抛出异常
  • case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
  • }
  • }
  • } catch {
  • case e: Throwable =>
  • println("Error while executing consumer group command " + e.getMessage)
  • println(Utils.stackTrace(e))
  • } finally {
  • consumerGroupService.close()
  • }
  • }

从源码可知,--list--describe--delete三个参数分别对应了ConsumerGroupService的list()describe()delete()三个方法。这里只分析新版本消费者对应的KafkaConsumerGroupService,它的定义和重要字段如下:

kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
  • class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
  • // AdminClient实例
  • private val adminClient = createAdminClient()
  • // `consumer` is only needed for `describe`, so we instantiate it lazily
  • // 在KafkaConsumer实例
  • private var consumer: KafkaConsumer[String, String] = null
  • ...
  • // 创建KafkaConsumer
  • private def getConsumer() = {
  • if (consumer == null)
  • consumer = createNewConsumer()
  • consumer
  • }
  • ...
  • }

list()方法和describe()方法中都是通过调用AdminClient的对应方法实现的。AdminClient的创建流程如下:

kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService#createAdminClient
  • private def createAdminClient(): AdminClient = {
  • // 查看是否有--command-config参数
  • val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
  • // 添加根据--bootstrap-server参数bootstrap.servers配置
  • props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
  • // 创建AdminClient对象
  • AdminClient.create(props)
  • }

AdminClient的create(...)方法的源码如下:

kafka.admin.AdminClient#create
  • def create(config: AdminConfig): AdminClient = {
  • val time = new SystemTime
  • // 度量器
  • val metrics = new Metrics(time)
  • // 元数据
  • val metadata = new Metadata
  • // ChannelBuilder
  • val channelBuilder = ClientUtils.createChannelBuilder(config.values())
  • // Broker的连接地址
  • val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
  • val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
  • val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
  • // 更新元数据
  • metadata.update(bootstrapCluster, 0)
  • // 创建Selector线程
  • val selector = new Selector(
  • DefaultConnectionMaxIdleMs,
  • metrics,
  • time,
  • "admin",
  • channelBuilder)
  • // 创建Network对象
  • val networkClient = new NetworkClient(
  • selector,
  • metadata,
  • "admin-" + AdminClientIdSequence.getAndIncrement(),
  • DefaultMaxInFlightRequestsPerConnection,
  • DefaultReconnectBackoffMs,
  • DefaultSendBufferBytes,
  • DefaultReceiveBufferBytes,
  • DefaultRequestTimeoutMs,
  • time)
  • // 创建ConsumerNetworkClient
  • val highLevelClient = new ConsumerNetworkClient(
  • networkClient,
  • metadata,
  • time,
  • DefaultRetryBackoffMs,
  • DefaultRequestTimeoutMs)
  • // 创建AdminClient
  • new AdminClient(
  • time,
  • DefaultRequestTimeoutMs,
  • highLevelClient,
  • bootstrapCluster.nodes().asScala.toList)
  • }

列举所有Consumer Group的操作是由KafkaConsumerGroupService的list()方法完成的,它内部直接调用AdminClient的listAllConsumerGroupsFlattened()方法实现的:

  • def list() {
  • adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
  • }
  • def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
  • listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
  • }
  • def listAllGroupsFlattened(): List[GroupOverview] = {
  • listAllGroups.values.flatten.toList
  • }

AdminClient的listAllConsumerGroupsFlattened()方法最终会调用其listAllGroups方法,该方法首先调用findAllBrokers()方法(底层通过sendAnyNode()实现)向集群中任一已知的Broker发送MetadataRequest请求,得到集群中的Broker信息。然后尝试向集群中每个Broker发送ListGroupsRequest请求,获取其负责管理的ConsumerGroup:

kafka.admin.AdminClient#listAllGroups
  • def listAllGroups(): Map[Node, List[GroupOverview]] = {
  • // 发送MetadataRequest请求获取集群中所有Broker的信息
  • findAllBrokers.map {
  • case broker =>
  • broker -> {
  • try {
  • // 发送ListGroupsRequest请求并阻塞等待响应
  • listGroups(broker)
  • } catch {
  • case e: Exception =>
  • debug(s"Failed to find groups from broker ${broker}", e)
  • List[GroupOverview]()
  • }
  • }
  • }.toMap
  • }

describe()方法定义在KafkaConsumerGroupService的父类ConsumerGroupService中,它直接调用了子类的describeGroup(...)方法,传入了从--group参数指定的Consumer Group ID,以便describeGroup(...)方法获取Consumer Group的详细信息,源码如下:

kafka.admin.AdminClient#describeConsumerGroup
  • protected def describeGroup(group: String) {
  • // 使用AdminClient相关方法查询Consumer Group的元数据信息
  • val consumerSummaries = adminClient.describeConsumerGroup(group)
  • if (consumerSummaries.isEmpty)
  • println(s"Consumer group `${group}` does not exist or is rebalancing.")
  • else {
  • // 获取KafkaConsumer
  • val consumer = getConsumer()
  • // 打印表头信息
  • printDescribeHeader()
  • // 遍历收到的Consumer Group元数据信息
  • consumerSummaries.foreach { consumerSummary =>
  • // 分区
  • val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
  • // 通过KafkaConsumer的committed()方法获取指定分区最近一次提交的offset,通过发送OffsetFetchRequest实现
  • val partitionOffsets = topicPartitions.flatMap { topicPartition =>
  • Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
  • topicPartition -> offsetAndMetadata.offset
  • }
  • }.toMap
  • // 获取分区对应的LEO值,输出信息
  • describeTopicPartition(group, topicPartitions, partitionOffsets.get,
  • _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
  • }
  • }
  • }

该方法会使用AdminClient的describeConsumerGroup(...)方法,它会根据获取的Consumer Group的状态进行区分处理:

  • def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
  • // 获取指定Consumer Group的元数据信息
  • val group = describeGroup(groupId)
  • if (group.state == "Dead") // Consumer Group状态为Dead,返回空集合
  • return List.empty[ConsumerSummary]
  • if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
  • throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
  • if (group.state == "Stable") { // Consumer Group状态为Stable
  • // 遍历Member并构造返回数据
  • group.members.map { member =>
  • val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
  • new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
  • }
  • } else {
  • List.empty
  • }
  • }

AdminClient的describeConsumerGroup(...)方法中使用的describeGroup(...)才是获取Consumer Group元数据的主要方法:

kafka.admin.AdminClient#describeGroup
  • def describeGroup(groupId: String): GroupSummary = {
  • // 发送GroupCoordinatorRequest请求查询GroupCoordinator
  • val coordinator = findCoordinator(groupId)
  • // 发送DescribeGroupRequest请求获取GroupMetadata元数据信息
  • val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
  • val response = new DescribeGroupsResponse(responseBody)
  • val metadata = response.groups().get(groupId)
  • if (metadata == null)
  • throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
  • Errors.forCode(metadata.errorCode()).maybeThrow()
  • // 解析GroupMetadata对象,封装为GroupSummary
  • val members = metadata.members().map { member =>
  • val metadata = Utils.readBytes(member.memberMetadata())
  • val assignment = Utils.readBytes(member.memberAssignment())
  • MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
  • }.toList
  • GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
  • }

--describe操作的关键步骤总结如下:

  1. 首先调用AdminClient的findCoordinator(...)方法查找指定Consumer Group对应的GroupCoordinator所在的节点。
  2. 向该GroupCoordinator发送DescribeGroupsRequest请求查询指定ConsumerGroup对应的GroupMetadata信息,并进行解析。
  3. 对于Stable状态的ConsumerGroup,将对应的GroupMetadata中记录的Member信息转换成ConsumerSummary并返回。对于处于其他状态的ConsumerGroup,则返回空集合。
  4. 通过KafkaConsumer的committed()方法获取指定分区的committed offset,底层通过发送OffsetFetchRequest请求实现。
  5. 通过KafkaConsumer的seekToEnd()方法和position()获取对应分区的LEO。
  6. 最后调用describeTopicPartition()方法完成输出。

10. DumpLogSegments工具

在kafka.tools包中还有一些其他工具类,这些工具类在$KAFKA_HOME/bin目录下并没有对应的脚本,例如,DumpLogSegments、JmxTool等。管理人员可以通过前面描述的kafka-run-class.sh脚本使用这些工具类。

DumpLogSegments工具类的主要功能是将指定的日志文件和索引文件中的内容打印到控制台,它还可以实现验证索引文件的功能。DumpLogSegments的main(...)方法的实现如下:

kafka.tools.DumpLogSegments#main
  • def main(args: Array[String]) {
  • val parser = new OptionParser
  • val printOpt = parser.accepts("print-data-log", "if set, printing the messages content when dumping data logs")
  • val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log without printing its content")
  • val indexSanityOpt = parser.accepts("index-sanity-check", "if set, just checks the index sanity without printing its content. " +
  • "This is the same check that is executed on broker startup to determine if an index needs rebuilding or not.")
  • val filesOpt = parser.accepts("files", "REQUIRED: The comma separated list of data and index log files to be dumped")
  • .withRequiredArg
  • .describedAs("file1, file2, ...")
  • .ofType(classOf[String])
  • val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
  • .withRequiredArg
  • .describedAs("size")
  • .ofType(classOf[java.lang.Integer])
  • .defaultsTo(5 * 1024 * 1024)
  • val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
  • val valueDecoderOpt = parser.accepts("value-decoder-class", "if set, used to deserialize the messages. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
  • .withOptionalArg()
  • .ofType(classOf[java.lang.String])
  • .defaultsTo("kafka.serializer.StringDecoder")
  • val keyDecoderOpt = parser.accepts("key-decoder-class", "if set, used to deserialize the keys. This class should implement kafka.serializer.Decoder trait. Custom jar should be available in kafka/libs directory.")
  • .withOptionalArg()
  • .ofType(classOf[java.lang.String])
  • .defaultsTo("kafka.serializer.StringDecoder")
  • val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as offset data from __consumer_offsets topic")
  • // 验证并解析参数
  • if(args.length == 0)
  • CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
  • val options = parser.parse(args : _*)
  • CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
  • val print = if(options.has(printOpt)) true else false
  • val verifyOnly = if(options.has(verifyOpt)) true else false
  • val indexSanityOnly = if(options.has(indexSanityOpt)) true else false
  • val files = options.valueOf(filesOpt).split(",")
  • val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
  • val isDeepIteration = if(options.has(deepIterationOpt)) true else false
  • val messageParser = if (options.has(offsetsOpt)) {
  • new OffsetsMessageParser
  • } else {
  • val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties)
  • val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties)
  • new DecoderMessageParser(keyDecoder, valueDecoder)
  • }
  • /**
  • * 当索引项在对应的日志文件中找不到对应的消息时,会将其记录到misMatchesForIndexFilesMap集合中
  • * 其中key为索引文件的绝对路径,value是索引项中的相对offset和消息的offset组成的元组的集合
  • */
  • val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
  • /**
  • * 如果消息是未压缩的,则需要offset是连续的,
  • * 若不连续,则记录到nonConsecutivePairsForLogFilesMap集合中,
  • * 其中key为日志文件的绝对路径,value是出现不连续消息的前后两个offset组成的元组的集合
  • */
  • val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
  • for(arg <- files) {
  • // 处理命令参数指定的文件集合
  • val file = new File(arg)
  • if(file.getName.endsWith(Log.LogFileSuffix)) {
  • // 打印日志文件
  • println("Dumping " + file)
  • dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize , messageParser)
  • } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
  • // 打印索引文件
  • println("Dumping " + file)
  • dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
  • }
  • }
  • // 遍历misMatchesForIndexFilesMap,输出错误信息
  • misMatchesForIndexFilesMap.foreach {
  • case (fileName, listOfMismatches) => {
  • System.err.println("Mismatches in :" + fileName)
  • listOfMismatches.foreach(m => {
  • System.err.println(" Index offset: %d, log offset: %d".format(m._1, m._2))
  • })
  • }
  • }
  • // 遍历nonConsecutivePairsForLogFilesMap,输出错误信息
  • nonConsecutivePairsForLogFilesMap.foreach {
  • case (fileName, listOfNonConsecutivePairs) => {
  • System.err.println("Non-secutive offsets in :" + fileName)
  • listOfNonConsecutivePairs.foreach(m => {
  • System.err.println(" %d is followed by %d".format(m._1, m._2))
  • })
  • }
  • }
  • }

DumpLogSegments的dumpLog(...)方法会遍历日志文件并打印消息的相关信息,例如,消息的offset、position、压缩方式、CRC校验码等,也会解析消息的键和值并打印:

kafka.tools.DumpLogSegments#dumpLog
  • /* print out the contents of the log */
  • private def dumpLog(file: File,
  • printContents: Boolean,
  • nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
  • isDeepIteration: Boolean,
  • maxMessageSize: Int,
  • parser: MessageParser[_, _]) {
  • // 打印日志文件的baseOffset
  • val startOffset = file.getName().split("\\.")(0).toLong
  • println("Starting offset: " + startOffset)
  • // 创建FileMessageSet对象
  • val messageSet = new FileMessageSet(file, false)
  • // 记录通过验证的字节数
  • var validBytes = 0L
  • // 记录offset
  • var lastOffset = -1l
  • // 浅层遍历器
  • val shallowIterator = messageSet.iterator(maxMessageSize)
  • for(shallowMessageAndOffset <- shallowIterator) { // this only does shallow iteration
  • // 遍历日志文件中的消息,根据--deep-iteration参数以及消息是否压缩决定何时的迭代器
  • val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
  • for (messageAndOffset <- itr) {
  • val msg = messageAndOffset.message
  • if(lastOffset == -1)
  • // 记录上次循环处理的消息的offset
  • lastOffset = messageAndOffset.offset
  • // If we are iterating uncompressed messages, offsets must be consecutive
  • else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
  • // 如果消息是未压缩的,则需要offset是连续的,若不连续则需要进行记录
  • var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
  • nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
  • nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
  • }
  • lastOffset = messageAndOffset.offset
  • // 输出消息相关的信息
  • print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
  • " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
  • " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
  • if(msg.hasKey)
  • print(" keysize: " + msg.keySize)
  • if(printContents) {
  • // 解析消息
  • val (key, payload) = parser.parse(msg)
  • // 输出消息的key
  • key.map(key => print(s" key: ${key}"))
  • // 输出消息的值
  • payload.map(payload => print(s" payload: ${payload}"))
  • }
  • println()
  • }
  • // 记录通过验证,正常打印的字节数
  • validBytes += MessageSet.entrySize(shallowMessageAndOffset.message)
  • }
  • val trailingBytes = messageSet.sizeInBytes - validBytes
  • // 出现验证失败,输出提示信息
  • if(trailingBytes > 0)
  • println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
  • }

DumpLogSegments的dumpIndex(...)方法会遍历索引文件,并检测Index中的索引项是否能在对应的日志文件中找到对应消息,它会根据verifyOnly参数决定是否打印索引项的内容:

kafka.tools.DumpLogSegments#dumpIndex
  • /* print out the contents of the index */
  • private def dumpIndex(file: File,
  • indexSanityOnly: Boolean,
  • verifyOnly: Boolean,
  • misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
  • maxMessageSize: Int) {
  • // 获取baseOffset
  • val startOffset = file.getName().split("\\.")(0).toLong
  • // 获取对应的日志文件
  • val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
  • // 创建FileMessageSet
  • val messageSet = new FileMessageSet(logFile, false)
  • // 创建OffsetIndex
  • val index = new OffsetIndex(file, baseOffset = startOffset)
  • //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
  • // 对索引文件进行检查,
  • if (indexSanityOnly) {
  • index.sanityCheck
  • println(s"$file passed sanity check.")
  • return
  • }
  • for(i <- 0 until index.entries) {
  • // 读取索引项
  • val entry = index.entry(i)
  • // 读取一个分片FileMessageSet,分片的起始位置是索引项指定的位置
  • val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize)
  • // 从分片FileMessageSet中获取第一条消息
  • val messageAndOffset = getIterator(partialFileMessageSet.head, isDeepIteration = true).next()
  • if(messageAndOffset.offset != entry.offset + index.baseOffset) {
  • // 如果消息的offset与索引项的offset不匹配,则需要记录下来
  • var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
  • misMatchesSeq ::=(entry.offset + index.baseOffset, messageAndOffset.offset)
  • misMatchesForIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
  • }
  • // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
  • if(entry.offset == 0 && i > 0)
  • return
  • if (!verifyOnly) // 输出索引项的内容
  • println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
  • }
  • }

11. kafka-producer-perf-test.sh脚本

kafka-producer-perf-test脚本主要负责测试生产者的各种性能指标,它的源码如下:

  • #!/bin/bash
  • # 设置Kafka的JVM参数
  • if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  • export KAFKA_HEAP_OPTS="-Xmx512M"
  • fi
  • # 执行org.apache.kafka.tools.ProducerPerformance的main()方法
  • exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance "$@"

从Kafka 0.9.0版本开始,kafka-producer-perf-test脚本废弃了原有core模块中的kafka.tools.ProducerPerformance,开始使用tools模块中的org.apache.kafka.tools.ProducerPerformance。ProducerPerformance的main(...)方法是kafka-producer-perf-test脚本的入口函数,源码如下:

org.apache.kafka.tools.ProducerPerformance#main
  • public static void main(String[] args) throws Exception {
  • // 命令行参数解析
  • ArgumentParser parser = argParser();
  • try {
  • Namespace res = parser.parseArgs(args);
  • /**
  • * parse args
  • * 解析命令行参数
  • **/
  • String topicName = res.getString("topic");
  • long numRecords = res.getLong("numRecords");
  • int recordSize = res.getInt("recordSize");
  • int throughput = res.getInt("throughput");
  • List<String> producerProps = res.getList("producerConfig");
  • Properties props = new Properties();
  • if (producerProps != null)
  • for (String prop : producerProps) {
  • String[] pieces = prop.split("=");
  • if (pieces.length != 2)
  • throw new IllegalArgumentException("Invalid property: " + prop);
  • props.put(pieces[0], pieces[1]);
  • }
  • props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  • props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  • // 创建KafkaProducer
  • KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
  • /* setup perf test */
  • // 根据--record-size参数创建测试消息的载荷
  • byte[] payload = new byte[recordSize];
  • Random random = new Random(0);
  • for (int i = 0; i < payload.length; ++i)
  • // 生产随机字节填充消息载荷
  • payload[i] = (byte) (random.nextInt(26) + 65);
  • // 创建ProducerRecord,主题名称由--topic参数指定
  • ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, payload);
  • // 创建Stats对象,用于指标的统计,其中--num-records参数指定了产生的消息个数
  • Stats stats = new Stats(numRecords, 5000);
  • // 记录开始时间
  • long startMs = System.currentTimeMillis();
  • // 创建限流器
  • ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
  • // 循环发送消息
  • for (int i = 0; i < numRecords; i++) {
  • // 记录发送时间
  • long sendStartMs = System.currentTimeMillis();
  • Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
  • // 发送消息
  • producer.send(record, cb);
  • // 限流器限流
  • if (throttler.shouldThrottle(i, sendStartMs)) {
  • throttler.throttle();
  • }
  • }
  • /* print final results */
  • producer.close();
  • // 打印统计信息
  • stats.printTotal();
  • } catch (ArgumentParserException e) {
  • if (args.length == 0) {
  • parser.printHelp();
  • System.exit(0);
  • } else {
  • parser.handleError(e);
  • System.exit(1);
  • }
  • }
  • }

main(...)方法中的Stats负责记录多项数据并在测试完成后输出测试的各项性能指标,它的定义和字段如下:

org.apache.kafka.tools.ProducerPerformance.Stats
  • private static class Stats {
  • // 记录开始测试的时间戳
  • private long start;
  • // 当前时间窗口的起始时间戳
  • private long windowStart;
  • // 记录每个样本中的延迟
  • private int[] latencies;
  • // 样本个数,与指定发送的消息数量有关,默认是500000为一个样本
  • private int sampling;
  • // 记录迭代次数
  • private int iteration;
  • private int index;
  • // 记录整个过程发送的消息个数
  • private long count;
  • // 记录发送消息的总字节数
  • private long bytes;
  • // 记录从消息发出到对应响应返回之间的延迟的最大值
  • private int maxLatency;
  • // 记录延迟的总时间
  • private long totalLatency;
  • // 当前时间窗口中发送消息的个数
  • private long windowCount;
  • // 记录当前时间窗口中最大的延时
  • private int windowMaxLatency;
  • // 记录当前时间窗口中延时的总时长
  • private long windowTotalLatency;
  • // 记录当前窗口发送的总字节数
  • private long windowBytes;
  • // 保存两次输出之间的时间间隔
  • private long reportingInterval;
  • ...
  • }

在ProducerPerformance的main(...)方法中,发送的ProducerRecord对象都会创建一个对应的PerfCallback对象作为回调,在PerfCallback的onCompletion(...)中会调用Stats的record(...)方法记录相关的统计信息:

org.apache.kafka.tools.ProducerPerformance.PerfCallback#onCompletion
  • public void onCompletion(RecordMetadata metadata, Exception exception) {
  • long now = System.currentTimeMillis();
  • // 计算消息的延迟
  • int latency = (int) (now - start);
  • // 使用Stat的record()方法进行记录
  • this.stats.record(iteration, latency, bytes, now);
  • if (exception != null)
  • exception.printStackTrace();
  • }

Stats的record()方法负责更新上述字段中的值,并按照reportingInterval字段指定的时间间隔打印统计信息:

org.apache.kafka.tools.ProducerPerformance.Stats#record
  • public void record(int iter, int latency, int bytes, long time) {
  • // 计算发送消息个数
  • this.count++;
  • // 计算发送总字节数
  • this.bytes += bytes;
  • // 计算总延迟
  • this.totalLatency += latency;
  • // 计算最大延迟
  • this.maxLatency = Math.max(this.maxLatency, latency);
  • // 计算当前窗口发送消息个数
  • this.windowCount++;
  • // 计算当前窗口发送总字节数
  • this.windowBytes += bytes;
  • // 计算当前窗口的总延迟
  • this.windowTotalLatency += latency;
  • // 记录当前窗口的最大延迟
  • this.windowMaxLatency = Math.max(windowMaxLatency, latency);
  • // 选择样本,更新latencies中对应的值
  • if (iter % this.sampling == 0) {
  • this.latencies[index] = latency;
  • this.index++;
  • }
  • /* maybe report the recent perf */
  • // 检测是否需要结束当前窗口,并开启新窗口
  • if (time - windowStart >= reportingInterval) {
  • printWindow();
  • newWindow();
  • }
  • }

当全部的消息发送完之后,会调用Stats的printTotal()方法输出整个测试过程中全局的统计信息,例如,消息总个数、每秒发送的消息数、每秒发送的字节数、平均每个消息的延迟、最大延迟,还会计算延时时间的多个分位数。

12. kafka-consumer-perf-test.sh脚本

kafka-consumer-perf-test脚本负责测试消费者的各项性能指标,源码如下:

  • #!/bin/bash
  • # 设置Kafka的JVM参数
  • if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  • export KAFKA_HEAP_OPTS="-Xmx512M"
  • fi
  • # 执行ConsumerPerformance的main()方法
  • exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@"

kafka-consumer-perf-test.sh脚本底层通过调用ConsumerPerformance的main(...)方法实现的,这里重点分析对新版本Consumer的性能测试,具体实现如下:

kafka.tools.ConsumerPerformance#main
  • def main(args: Array[String]): Unit = {
  • // 解析参数包装为ConsumerPerfConfig对象
  • val config = new ConsumerPerfConfig(args)
  • logger.info("Starting consumer...")
  • // 从服务端拉取的消息数
  • val totalMessagesRead = new AtomicLong(0)
  • // 获取消息的总字节数
  • val totalBytesRead = new AtomicLong(0)
  • // Consumer是否超时
  • val consumerTimeout = new AtomicBoolean(false)
  • // 根据--hide-header参数决定是否输出头信息
  • if (!config.hideHeader) {
  • // 根据--show-detailed-stats参数决定是否输出详细的指标
  • if (!config.showDetailedStats)
  • println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
  • else
  • println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
  • }
  • // 记录整个测试过程的开始时间戳和结束时间戳
  • var startMs, endMs = 0L
  • // 根据--new-consumer参数决定是否使用新版本Consumer
  • if(config.useNewConsumer) {
  • // 创建KafkaConsumer
  • val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
  • consumer.subscribe(List(config.topic))
  • startMs = System.currentTimeMillis
  • // 该方法是测试代码的核心
  • consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
  • endMs = System.currentTimeMillis
  • consumer.close()
  • } else {
  • import kafka.consumer.ConsumerConfig
  • val consumerConfig = new ConsumerConfig(config.props)
  • val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
  • val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
  • var threadList = List[ConsumerPerfThread]()
  • for ((topic, streamList) <- topicMessageStreams)
  • for (i <- 0 until streamList.length)
  • threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
  • logger.info("Sleeping for 1 second.")
  • Thread.sleep(1000)
  • logger.info("starting threads")
  • startMs = System.currentTimeMillis
  • for (thread <- threadList)
  • thread.start
  • for (thread <- threadList)
  • thread.join
  • endMs =
  • if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs
  • else System.currentTimeMillis
  • consumerConnector.shutdown()
  • }
  • val elapsedSecs = (endMs - startMs) / 1000.0
  • if (!config.showDetailedStats) {
  • // 输出测试过程的开始时间、结束时间、消费的总字节数、每秒拉取的字节数、每秒消费的消息条数以及消息的总消息数
  • val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
  • println(("%s, %s, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
  • totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
  • }
  • }

main(...)方法中的主要逻辑是位于ConsumerPerformance的consume(...)方法,它在开始测试之前会等待Rebalance操作完成,然后通过KafkaConsumer的poll(...)从服务端拉取消息并记录消息的大小、拉取时间等信息,之后间隔指定时间输出一次统计信息,最后更新totalMessagesReadtotalMessagesRead用于main(...)方法最后的汇总输出:

kafka.tools.ConsumerPerformance#consume
  • def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
  • // 记录读取到的消息的总字节数
  • var bytesRead = 0L
  • // 记录拉取消息的个数
  • var messagesRead = 0L
  • var lastBytesRead = 0L
  • var lastMessagesRead = 0L
  • // Wait for group join, metadata fetch, etc
  • // 等待Rebalance操作完成的最长时间
  • val joinTimeout = 10000
  • // 标识当前消费者是否已经分配了分区
  • val isAssigned = new AtomicBoolean(false)
  • // 订阅Topic,同时添加ConsumerRebalanceListener用来修改isAssigned的值
  • consumer.subscribe(topics, new ConsumerRebalanceListener {
  • def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
  • isAssigned.set(true)
  • }
  • def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
  • isAssigned.set(false)
  • }})
  • val joinStart = System.currentTimeMillis()
  • // 等待分区分配过程完成
  • while (!isAssigned.get()) {
  • if (System.currentTimeMillis() - joinStart >= joinTimeout) {
  • throw new Exception("Timed out waiting for initial group join.")
  • }
  • consumer.poll(100)
  • }
  • // 调整消费者,从分区的第一条消息开始消费
  • consumer.seekToBeginning(List[TopicPartition]())
  • // Now start the benchmark
  • // 记录测试开始时间
  • val startMs = System.currentTimeMillis
  • var lastReportTime: Long = startMs
  • // 记录最后一次拉取消息时间
  • var lastConsumedTime = System.currentTimeMillis
  • while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) {
  • val records = consumer.poll(100)
  • if(records.count() > 0)
  • lastConsumedTime = System.currentTimeMillis
  • for(record <- records) {
  • // 增加消费的总消息数量
  • messagesRead += 1
  • if(record.key != null)
  • // 增加消费的总字节数
  • bytesRead += record.key.size
  • if(record.value != null)
  • // 增加消费的总字节数
  • bytesRead += record.value.size
  • // 间隔reportingInterval时间,输出一次统计数据
  • if (messagesRead % config.reportingInterval == 0) {
  • if (config.showDetailedStats)
  • printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat)
  • // 记录输出时间
  • lastReportTime = System.currentTimeMillis
  • // 更新,供下次输出使用
  • lastMessagesRead = messagesRead
  • // 更新totalBytesRead
  • lastBytesRead = bytesRead
  • }
  • }
  • }
  • // 更新totalMessagesRead
  • totalMessagesRead.set(messagesRead)
  • totalBytesRead.set(bytesRead)
  • }

13. kafka-mirror-maker.sh脚本

kafka-mirror-maker脚本的主要功能是实现两个Kafka集群中的数据同步,它的基本原理是通过消费者从源集群中获取消息,再通过生产者将消息追加到目的集群中,这样就实现了数据在两个集群中的迁移。kafka-mirror-maker脚本通过调用MirrorMaker实现,MirrorMaker中支持新版本的消费者和生产者。该脚本源码如下:

  • #!/bin/bash
  • exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker "$@"

MirrorMaker中创建了多个MirrorMakerNewConsumer对象,每个MirrorMakerNewConsumer对象对应创建一个MirrorMakerThread线程,实现从源集群中的消息拉取。多个MirrorMakerThread线程共享一个MirrorMakerProducer对象,实现向目的集群追加消息的功能。MirrorMaker的定义和字段如下:

kafka.tools.MirrorMaker
  • object MirrorMaker extends Logging with KafkaMetricsGroup {
  • // 生产者对象
  • private var producer: MirrorMakerProducer = null
  • // MirrorMakerThread线程集合
  • private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
  • private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
  • // Track the messages not successfully sent by mirror maker.
  • // 记录MirrorMaker发送失败的消息个数
  • private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
  • /**
  • * 当MirrorMakerNewConsumer从源集群中将消息拉取到本地后,
  • * 会先经过MirrorMakerMessageHandler处理,再发送给目的集群。
  • * 此字段的具体类型是由message.handler参数指定的,默认值是defaultMirrorMakerMessageHandler。
  • */
  • private var messageHandler: MirrorMakerMessageHandler = null
  • // 指定生产者进行flush()操作和提交offset的周期
  • private var offsetCommitIntervalMs = 0
  • private var abortOnSendFailure: Boolean = true
  • @volatile private var exitingOnSendFailure: Boolean = false
  • ...
  • }

MirrorMaker的main(...)方法会解析命令行参数,然后创建MirrorMakerProducer和MirrorMakerNewConsumer,同时为每个MirrorMakerNewConsumer创建对应的MirrorMakerThread线程,之后创建并初始化MessageHandler,最后启动MirrorMakerThread线程,主线程阻塞等待全部的MirrorMakerThread线程结束:

  • def main(args: Array[String]) {
  • info("Starting mirror maker")
  • try {
  • // 解析命令行参数
  • val parser = new OptionParser
  • val consumerConfigOpt = parser.accepts("consumer.config",
  • "Embedded consumer config for consuming from the source cluster.")
  • .withRequiredArg()
  • .describedAs("config file")
  • .ofType(classOf[String])
  • val useNewConsumerOpt = parser.accepts("new.consumer",
  • "Use new consumer in mirror maker.")
  • val producerConfigOpt = parser.accepts("producer.config",
  • "Embedded producer config.")
  • .withRequiredArg()
  • .describedAs("config file")
  • .ofType(classOf[String])
  • val numStreamsOpt = parser.accepts("num.streams",
  • "Number of consumption streams.")
  • .withRequiredArg()
  • .describedAs("Number of threads")
  • .ofType(classOf[java.lang.Integer])
  • .defaultsTo(1)
  • val whitelistOpt = parser.accepts("whitelist",
  • "Whitelist of topics to mirror.")
  • .withRequiredArg()
  • .describedAs("Java regex (String)")
  • .ofType(classOf[String])
  • val blacklistOpt = parser.accepts("blacklist",
  • "Blacklist of topics to mirror. Only old consumer supports blacklist.")
  • .withRequiredArg()
  • .describedAs("Java regex (String)")
  • .ofType(classOf[String])
  • val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
  • "Offset commit interval in ms")
  • .withRequiredArg()
  • .describedAs("offset commit interval in millisecond")
  • .ofType(classOf[java.lang.Integer])
  • .defaultsTo(60000)
  • val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
  • "The consumer rebalance listener to use for mirror maker consumer.")
  • .withRequiredArg()
  • .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
  • .ofType(classOf[String])
  • val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
  • "Arguments used by custom rebalance listener for mirror maker consumer")
  • .withRequiredArg()
  • .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
  • .ofType(classOf[String])
  • val messageHandlerOpt = parser.accepts("message.handler",
  • "Message handler which will process every record in-between consumer and producer.")
  • .withRequiredArg()
  • .describedAs("A custom message handler of type MirrorMakerMessageHandler")
  • .ofType(classOf[String])
  • val messageHandlerArgsOpt = parser.accepts("message.handler.args",
  • "Arguments used by custom message handler for mirror maker.")
  • .withRequiredArg()
  • .describedAs("Arguments passed to message handler constructor.")
  • .ofType(classOf[String])
  • val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
  • "Configure the mirror maker to exit on a failed send.")
  • .withRequiredArg()
  • .describedAs("Stop the entire mirror maker when a send failure occurs")
  • .ofType(classOf[String])
  • .defaultsTo("true")
  • val helpOpt = parser.accepts("help", "Print this message.")
  • if (args.length == 0)
  • CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
  • val options = parser.parse(args: _*)
  • if (options.has(helpOpt)) {
  • parser.printHelpOn(System.out)
  • System.exit(0)
  • }
  • CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
  • // 是否使用--new.consumer参数指定使用新消费者
  • val useNewConsumer = options.has(useNewConsumerOpt)
  • if (useNewConsumer) {
  • if (options.has(blacklistOpt)) { // --blacklist
  • error("blacklist can not be used when using new consumer in mirror maker. Use whitelist instead.")
  • System.exit(1)
  • }
  • if (!options.has(whitelistOpt)) { // --whitelist
  • error("whitelist must be specified when using new consumer in mirror maker.")
  • System.exit(1)
  • }
  • } else {
  • if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
  • error("Exactly one of whitelist or blacklist is required.")
  • System.exit(1)
  • }
  • }
  • // 是否在发送失败时停止发送,abort.on.send.failure
  • abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
  • // offset提交周期,offset.commit.interval.ms
  • offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
  • // 流数量,num.streams
  • val numStreams = options.valueOf(numStreamsOpt).intValue()
  • // 设置JVM关闭钩子
  • Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
  • override def run() {
  • cleanShutdown()
  • }
  • })
  • // create producer
  • // 获取生产者相关的配置
  • val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
  • // Defaults to no data loss settings.
  • maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
  • maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
  • maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
  • maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
  • // Always set producer key and value serializer to ByteArraySerializer.
  • producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
  • producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
  • // 创建MirrorMakerProducer生产者
  • producer = new MirrorMakerProducer(producerProps)
  • // Create consumers
  • // 创建消费者
  • val mirrorMakerConsumers = if (!useNewConsumer) {
  • val customRebalanceListener = {
  • // 根据consumer.rebalance.listener参数决定是否创建ConsumerRebalanceListener
  • val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
  • if (customRebalanceListenerClass != null) {
  • val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
  • if (rebalanceListenerArgs != null) {
  • Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
  • } else {
  • Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
  • }
  • } else {
  • None
  • }
  • }
  • // 检测customRebalanceListener的类型是否正确,若不正确则抛出异常
  • if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
  • throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener")
  • // 创建旧消费者
  • createOldConsumers(
  • numStreams, // 消费者个数
  • options.valueOf(consumerConfigOpt), //消费者的配置
  • customRebalanceListener,
  • Option(options.valueOf(whitelistOpt)), // 白名单
  • Option(options.valueOf(blacklistOpt))) // 黑名单
  • } else {
  • // 根据consumer.rebalance.listener参数决定是否创建ConsumerRebalanceListener
  • val customRebalanceListener = {
  • val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
  • if (customRebalanceListenerClass != null) {
  • val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
  • if (rebalanceListenerArgs != null) {
  • Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
  • } else {
  • Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass))
  • }
  • } else {
  • None
  • }
  • }
  • // 检测customRebalanceListener的类型是否正确,若不正确则抛出异常
  • if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
  • throw new IllegalArgumentException("The rebalance listener should be an instance of" +
  • "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
  • // 创建新消费者
  • createNewConsumers(
  • numStreams, // 消费者个数
  • options.valueOf(consumerConfigOpt), // 消费者相关配置
  • customRebalanceListener,
  • Option(options.valueOf(whitelistOpt))) // 白名单
  • }
  • // Create mirror maker threads.
  • // 为每个消费者创建一个MirrorMakerThread线程
  • mirrorMakerThreads = (0 until numStreams) map (i =>
  • new MirrorMakerThread(mirrorMakerConsumers(i), i))
  • // Create and initialize message handler
  • // 根据message.handler参数创建并初始化MessageHandler
  • val customMessageHandlerClass = options.valueOf(messageHandlerOpt) // message.handler
  • val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) // message.handler.args
  • // 创建MessageHandler
  • messageHandler = {
  • if (customMessageHandlerClass != null) {
  • if (messageHandlerArgs != null)
  • CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
  • else
  • CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
  • } else {
  • defaultMirrorMakerMessageHandler
  • }
  • }
  • } catch {
  • case ct : ControlThrowable => throw ct
  • case t : Throwable =>
  • error("Exception when starting mirror maker.", t)
  • }
  • // 启动MirrorMakerThread线程
  • mirrorMakerThreads.foreach(_.start())
  • // 主线程阻塞,等待MirrorMakerThread线程全部结束
  • mirrorMakerThreads.foreach(_.awaitShutdown())
  • }

MirrorMakerProducer是对KafkaProducer的简单封装。这里要注意,在MirrorMakerProducerCallback中会统计发送失败的消息数量,还会根据abort.on.send.failure配置决定在出现发送失败的情况时是否关闭生产者。具体实现如下:

kafka.tools.MirrorMaker.MirrorMakerProducer
  • private class MirrorMakerProducer(val producerProps: Properties) {
  • val sync = producerProps.getProperty("producer.type", "async").equals("sync")
  • val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
  • def send(record: ProducerRecord[Array[Byte], Array[Byte]]) {
  • if (sync) { // 同步发送
  • this.producer.send(record).get()
  • } else { // 异步发送
  • this.producer.send(record,
  • // 发送完成的回调
  • new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
  • }
  • }
  • def flush() {
  • this.producer.flush()
  • }
  • def close() {
  • this.producer.close()
  • }
  • def close(timeout: Long) {
  • this.producer.close(timeout, TimeUnit.MILLISECONDS)
  • }
  • }

MirrorMakerProducerCallback的源码如下:

kafka.tools.MirrorMaker.MirrorMakerProducerCallback
  • /**
  • * 统计发送失败的消息数量,
  • * 根据abort.on.send.failure配置决定在出现发送失败的情况时是否关闭生产者
  • */
  • private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte])
  • extends ErrorLoggingCallback(topic, key, value, false) {
  • override def onCompletion(metadata: RecordMetadata, exception: Exception) {
  • if (exception != null) { // 出现异常的相关处理
  • // Use default call back to log error. This means the max retries of producer has reached and message
  • // still could not be sent.
  • // 通过父类的实现输出错误日志
  • super.onCompletion(metadata, exception)
  • // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
  • /** 如果设置了abort.on.send.failure参数,则停止MirrorMaker,
  • * 否则忽略异常,继续发送后面的消息
  • */
  • if (abortOnSendFailure) {
  • info("Closing producer due to send failure.")
  • // 设置为true后会通知全部MirrorMakerThread停止
  • exitingOnSendFailure = true
  • producer.close(0)
  • }
  • // 记录发送失败的消息数量
  • numDroppedMessages.incrementAndGet()
  • }
  • }
  • }

MirrorMaker的createNewConsumers(...)方法负责创建多个MirrorMakerNewConsumer,具体数目由num.streams参数指定:

kafka.tools.MirrorMaker#createNewConsumers
  • def createNewConsumers(numStreams: Int,
  • consumerConfigPath: String,
  • customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
  • whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
  • // Create consumer connector
  • // 获取KafkaConsumer的相关配置项
  • val consumerConfigProps = Utils.loadProps(consumerConfigPath)
  • // Disable consumer auto offsets commit to prevent data loss.
  • maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
  • // Hardcode the deserializer to ByteArrayDeserializer
  • consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
  • consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
  • // The default client id is group id, we manually set client id to groupId-index to avoid metric collision
  • val groupIdString = consumerConfigProps.getProperty("group.id")
  • // 创建指定数量的KafkaConsumer对象
  • val consumers = (0 until numStreams) map { i =>
  • consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
  • new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
  • }
  • whitelist.getOrElse(throw new IllegalArgumentException("White list cannot be empty for new consumer"))
  • // 创建MirrorMakerNewConsumer
  • consumers.map(consumer => new MirrorMakerNewConsumer(consumer, customRebalanceListener, whitelist))
  • }

MirrorMakerNewConsumer通过KafkaConsumer实现从源集群中拉取消息的功能。在MirrorMakerNewConsumer实现中关闭了KafkaConsumer自动提交offset的功能,而使用offsets字段维护其消费的分区的offset,并在进行Rebalance操作前通过InternalRebalanceListenerForNewConsumer提交offset:

kafka.tools.MirrorMaker.MirrorMakerNewConsumer
  • private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
  • customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
  • whitelistOpt: Option[String])
  • extends MirrorMakerBaseConsumer {
  • val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
  • var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
  • // TODO: we need to manually maintain the consumed offsets for new consumer
  • // since its internal consumed position is updated in batch rather than one
  • // record at a time, this can be resolved when we break the unification of both consumers
  • // 使用HashMap维护自己消费的offset
  • private val offsets = new HashMap[TopicPartition, Long]()
  • override def init() {
  • debug("Initiating new consumer")
  • // 创建InternalRebalanceListenerForNewConsumer
  • val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
  • if (whitelistOpt.isDefined) {
  • try {
  • // 订阅指定白名单
  • consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
  • } catch {
  • case pse: PatternSyntaxException =>
  • error("Invalid expression syntax: %s".format(whitelistOpt.get))
  • throw pse
  • }
  • }
  • }
  • override def hasData = true
  • override def receive() : BaseConsumerRecord = {
  • if (recordIter == null || !recordIter.hasNext) {
  • // 从源集群中拉取消息
  • recordIter = consumer.poll(1000).iterator
  • if (!recordIter.hasNext)
  • throw new ConsumerTimeoutException
  • }
  • val record = recordIter.next()
  • val tp = new TopicPartition(record.topic, record.partition)
  • // 记录消费的offset
  • offsets.put(tp, record.offset + 1)
  • // 返回拉取的消息
  • BaseConsumerRecord(record.topic,
  • record.partition,
  • record.offset,
  • record.timestamp,
  • record.timestampType,
  • record.key,
  • record.value)
  • }
  • override def stop() {
  • consumer.wakeup()
  • }
  • override def cleanup() {
  • consumer.close()
  • }
  • override def commit() {
  • consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))})
  • offsets.clear()
  • }
  • }

InternalRebalanceListenerForNewConsumer实现了ConsumerRebalanceListener接口,在进行Rebalance操作之前会将offsets字段中记录的offset进行提交:

kafka.tools.MirrorMaker.InternalRebalanceListenerForNewConsumer
  • private class InternalRebalanceListenerForNewConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
  • customRebalanceListenerForNewConsumer: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])
  • extends org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
  • override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
  • // 将KafkaProducer缓存的消息全部发送到目标集群
  • producer.flush()
  • // 提交offsets集合中记录的offset
  • commitOffsets(mirrorMakerConsumer)
  • // 调用consumer.rebalance.listener参数指定的ConsumerRebalanceListener实现自定义的功能
  • customRebalanceListenerForNewConsumer.foreach(_.onPartitionsRevoked(partitions))
  • }
  • override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
  • customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
  • }
  • }

MirrorMakerThread线程会首先调用对应MirrorMakerNewConsumer对象的receive(...)方法获取消息,然后将消息交给MirrorMakerMessageHandler的handle(...)方法处理,之后将处理完的消息追加到目的集群中。在这个过程中会按照offset.commit.interval.ms参数指定的时间间隔提交offset:

kafka.tools.MirrorMaker.MirrorMakerThread
  • class MirrorMakerThread(mirrorMakerConsumer: MirrorMakerBaseConsumer,
  • val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
  • private val threadName = "mirrormaker-thread-" + threadId
  • private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
  • private var lastOffsetCommitMs = System.currentTimeMillis()
  • @volatile private var shuttingDown: Boolean = false
  • this.logIdent = "[%s] ".format(threadName)
  • setName(threadName)
  • override def run() {
  • info("Starting mirror maker thread " + threadName)
  • try {
  • // 初始化MirrorMakerNewConsumer
  • mirrorMakerConsumer.init()
  • // We need the two while loop to make sure when old consumer is used, even there is no message we
  • // still commit offset. When new consumer is used, this is handled by poll(timeout).
  • while (!exitingOnSendFailure && !shuttingDown) {
  • try {
  • // 检测exitingOnSendFailure、shuttingDown等标记
  • while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
  • // 从源集群中获取消息
  • val data = mirrorMakerConsumer.receive()
  • trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
  • // 通过MirrorMakerMessageHandler创建ProducerRecord
  • val records = messageHandler.handle(data)
  • // 发送消息
  • records.foreach(producer.send)
  • // 尝试提交offset
  • maybeFlushAndCommitOffsets()
  • }
  • } catch {
  • case cte: ConsumerTimeoutException =>
  • trace("Caught ConsumerTimeoutException, continue iteration.")
  • case we: WakeupException =>
  • trace("Caught ConsumerWakeupException, continue iteration.")
  • }
  • maybeFlushAndCommitOffsets()
  • }
  • } catch {
  • case t: Throwable =>
  • fatal("Mirror maker thread failure due to ", t)
  • } finally {
  • CoreUtils.swallow {
  • info("Flushing producer.")
  • producer.flush()
  • // note that this commit is skipped if flush() fails which ensures that we don't lose messages
  • info("Committing consumer offsets.")
  • commitOffsets(mirrorMakerConsumer)
  • }
  • info("Shutting down consumer connectors.")
  • CoreUtils.swallow(mirrorMakerConsumer.stop())
  • CoreUtils.swallow(mirrorMakerConsumer.cleanup())
  • shutdownLatch.countDown()
  • info("Mirror maker thread stopped")
  • // if it exits accidentally, stop the entire mirror maker
  • if (!isShuttingdown.get()) {
  • fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
  • System.exit(-1)
  • }
  • }
  • }
  • def maybeFlushAndCommitOffsets() {
  • // 满足offset.commit.interval.ms参数指定的时间间隔,才提交一次offset
  • if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
  • debug("Committing MirrorMaker state automatically.")
  • // 将KafkaProducer缓冲的消息发送出去
  • producer.flush()
  • // 提交MirrorMakerNewConsumer的offsets字段中记录的offset
  • commitOffsets(mirrorMakerConsumer)
  • // 记录最近一次提交时间戳
  • lastOffsetCommitMs = System.currentTimeMillis()
  • }
  • }
  • def shutdown() {
  • try {
  • info(threadName + " shutting down")
  • shuttingDown = true
  • mirrorMakerConsumer.stop()
  • }
  • catch {
  • case ie: InterruptedException =>
  • warn("Interrupt during shutdown of the mirror maker thread")
  • }
  • }
  • def awaitShutdown() {
  • try {
  • shutdownLatch.await()
  • info("Mirror maker thread shutdown complete")
  • } catch {
  • case ie: InterruptedException =>
  • warn("Shutdown of the mirror maker thread interrupted")
  • }
  • }
  • }