大数据
流式处理
Kafka
源码解析

Kafka系列 27 - 服务端源码分析 18:Kafka中的Metrics和监控

简介:主要讲解JMX和Metrics在Kafka中的使用原理

注:本文主要参考《Apache Kafka源码剖析》的第4章4.9节。

1. Kafka的Metrics

Kafka在Metrics的基础上进行了一次封装。在KafkaMetricsGroup中定义了一些基础方法:

kafka.metrics.KafkaMetricsGroup
  • trait KafkaMetricsGroup extends Logging {
  • private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
  • // 当前类的对象
  • val klass = this.getClass
  • // 包名
  • val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
  • // 简单类名
  • val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
  • // 创建MetricsName
  • explicitMetricName(pkg, simpleName, name, tags)
  • }
  • private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
  • // 用于创建MBean的名称
  • val nameBuilder: StringBuilder = new StringBuilder
  • // 包名
  • nameBuilder.append(group)
  • // 类名
  • nameBuilder.append(":type=")
  • nameBuilder.append(typeName)
  • // Metrics名称
  • if (name.length > 0) {
  • nameBuilder.append(",name=")
  • nameBuilder.append(name)
  • }
  • val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
  • // 遍历tags集合,以逗号分割每个Entry,拼接字符串
  • val tagsName = KafkaMetricsGroup.toMBeanName(tags)
  • tagsName match {
  • case Some(tn) =>
  • // 第四部分是tags
  • nameBuilder.append(",").append(tn)
  • case None =>
  • }
  • new MetricName(group, typeName, name, scope, nameBuilder.toString())
  • }
  • // 用于注册五种Metric度量的方法,基于Yammer的Metrics类
  • def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
  • Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
  • def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
  • Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
  • def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
  • Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
  • def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
  • Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
  • def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
  • Metrics.defaultRegistry().removeMetric(metricName(name, tags))
  • }

其中,metricName(...)方法按照一定的规则生成度量对象的MetricName;以“new”开头的方法用于创建对应类型的度量对象,removeMetric(...)方法则用于移除指定的度量对象。

1.1. Log类中的Gauge度量

在日志存储相关的Log类中,使用Gauge度量类对当前Log中的LogSegment数量进行了统计:

kafka.log.Log
  • // 用于添加Metrics的tags
  • val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
  • // 注册Gauge度量对象
  • newGauge("NumLogSegments",
  • new Gauge[Int] {
  • // 记录当前Log中的Segment对象个数
  • def value = numberOfSegments
  • },
  • tags)
  • ...
  • def numberOfSegments: Int = segments.size

根据metricName(...)方法的特性,可以得到这里产生的MBean名称为kafka.log:type=Log,name=NumLogSegments,topic=[Topic],partition=[partition],在MBean监控的显示如下:

1.KafkaMetrics中的Gauge度量.png

1.2. ReplicaManager类中的Meter度量

在ReplicaManager中使用Meter度量统计全部分区的ISR集合发生扩张或缩小的频率:

kafka.server.ReplicaManager
  • ...
  • // 使用Meter度量类统计全部分区的ISR集合发生扩张/缩减的频率
  • val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
  • val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
  • ...

当发生ISR集合扩张或缩减时,在Partition类的maybeExpandIsr(...)maybeShrinkIsr(...)两个方法中会分别对这两个度量值进行记录:

  • // ISR集合扩充
  • def maybeExpandIsr(replicaId: Int) {
  • ...
  • // 这里会使用Meter度量对象标识一次ISR集合的扩张
  • replicaManager.isrExpandRate.mark()
  • ...
  • }
  • // ISR集合缩减
  • def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  • ...
  • // 这里会使用Meter度量对象标识一次ISR集合的缩减
  • replicaManager.isrShrinkRate.mark()
  • ...
  • }

对应于MBean监控的显示如下:

  • isrExpandRate的监控:

2.KafkaMetrics中的Meter度量-1.png

  • isrShrinkRate的监控:

3.KafkaMetrics中的Meter度量-2.png

1.3. Request类中的Histogram度量

在RequestChannel.Request类中使用了Histogram统计各类请求和响应在RequestChannel中等待时间的分布。如果有大量请求在RequestChannel中等待的时间过长,则需要进行调优,例如Handler线程配置过少,Kafka上下游的服务出现请求洪泛等都会导致问题。观察响应在RequestChannel中的等待时间也会得到一些类似的结论,相关的代码片段如下:

kafka.network.RequestMetrics
  • class RequestMetrics(name: String) extends KafkaMetricsGroup {
  • val tags = Map("request" -> name)
  • // 统计每秒请求数
  • val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
  • // time a request spent in a request queue
  • // 统计Request在RequestChannel中的等待时间
  • val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
  • // time a request takes to be processed at the local broker
  • // 统计Request在当前Broker中处理的用时
  • val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
  • // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
  • // 统计此Broker发送的Request在远端Broker中处理的用时,例如FetchRequest
  • val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
  • // time a request is throttled (only relevant to fetch and produce requests)
  • val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
  • // time a response spent in a response queue
  • // 统计Response在RequestChannel中的等待时间
  • val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
  • // time to send the response to the requester
  • // 统计发送Response的用时
  • val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
  • // 统计总耗时
  • val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
  • }

当在Processor线程中成功接收到一个请求时会创建RequestChannel.Request对象,并将startTimeMs初始化为当前时间,将requestDequeueTimeMs字段初始化为-1:

kafka.network.RequestChannel.Request
  • // Request请求Case类
  • case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
  • // These need to be volatile because the readers are in the network thread and the writers are in the request
  • // handler threads or the purgatory threads
  • // @volatile修饰保证线程可见性;记录操作时间的几个字段
  • @volatile var requestDequeueTimeMs = -1L
  • @volatile var apiLocalCompleteTimeMs = -1L
  • @volatile var responseCompleteTimeMs = -1L
  • @volatile var responseDequeueTimeMs = -1L
  • @volatile var apiRemoteCompleteTimeMs = -1L
  • ...
  • }

之后将请求放入RequestChannel中等待Handler线程处理;当Handler线程从RequestChannel中取出Request对象时会更新其requestDequeueTimeMs字段为当前时间戳,再将请求交给KafkaApis处理:

kafka.server.KafkaRequestHandler
  • class KafkaRequestHandler(id: Int,
  • brokerId: Int,
  • val aggregateIdleMeter: Meter,
  • val totalHandlerThreads: Int,
  • val requestChannel: RequestChannel,
  • apis: KafkaApis) extends Runnable with Logging {
  • ...
  • def run() {
  • // 从RequestChannel的requestQueue队列中获取请求
  • req = requestChannel.receiveRequest(300)
  • ...
  • // 更新requestDequeueTimeMs
  • req.requestDequeueTimeMs = SystemTime.milliseconds
  • ...
  • // 使用KafkaApis处理RequestChannel.Request
  • apis.handle(req)
  • ...
  • }
  • ...
  • }

对于查询GroupCoordinator的请求的RequestQueueTimeMs度量如下:

4.KafkaMetric中的Histogram度量-1.png

当KafkaApis处理完成后会生成RequestChannel.Response对象,并放入RequestChannel中等待Processor处理。此时会更新对应的Request的responseCompleteTimeMs为当前时间戳:

kafka.network.Processor#processCompletedSends
  • private def processCompletedSends() {
  • // 遍历completedSends队列
  • selector.completedSends.asScala.foreach { send =>
  • ...
  • // 调用Request的updateRequestMetrics()更新度量值
  • resp.request.updateRequestMetrics()
  • ...
  • }
  • }

其中使用到的RequestChannel.Request的updateRequestMetrics()方法源码如下:

kafka.network.RequestChannel.Request#updateRequestMetrics
  • def updateRequestMetrics() {
  • val endTimeMs = SystemTime.milliseconds
  • // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
  • // processing time is really small. This value is set in KafkaApis from a request handling thread.
  • // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
  • // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
  • if (apiLocalCompleteTimeMs < 0)
  • apiLocalCompleteTimeMs = responseCompleteTimeMs
  • // If the apiRemoteCompleteTimeMs is not set (i.e., for requests that do not go through a purgatory), then it is
  • // the same as responseCompleteTimeMs.
  • if (apiRemoteCompleteTimeMs < 0)
  • apiRemoteCompleteTimeMs = responseCompleteTimeMs
  • // 计算Request在RequestChannel中等待的时间
  • val requestQueueTime = math.max(requestDequeueTimeMs - startTimeMs, 0)
  • val apiLocalTime = math.max(apiLocalCompleteTimeMs - requestDequeueTimeMs, 0)
  • val apiRemoteTime = math.max(apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs, 0)
  • val apiThrottleTime = math.max(responseCompleteTimeMs - apiRemoteCompleteTimeMs, 0)
  • // 计算Resource在RequestChannel中等待的时间
  • val responseQueueTime = math.max(responseDequeueTimeMs - responseCompleteTimeMs, 0)
  • val responseSendTime = math.max(endTimeMs - responseDequeueTimeMs, 0)
  • val totalTime = endTimeMs - startTimeMs
  • val fetchMetricNames =
  • if (requestId == ApiKeys.FETCH.id) {
  • val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
  • Seq(
  • if (isFromFollower) RequestMetrics.followFetchMetricName
  • else RequestMetrics.consumerFetchMetricName
  • )
  • }
  • else Seq.empty
  • val metricNames = fetchMetricNames :+ ApiKeys.forId(requestId).name
  • metricNames.foreach { metricName =>
  • val m = RequestMetrics.metricsMap(metricName)
  • m.requestRate.mark()
  • // 更新requestQueueTimeHist
  • m.requestQueueTimeHist.update(requestQueueTime)
  • m.localTimeHist.update(apiLocalTime)
  • m.remoteTimeHist.update(apiRemoteTime)
  • m.throttleTimeHist.update(apiThrottleTime)
  • // 更新responseQueueTimeHist
  • m.responseQueueTimeHist.update(responseQueueTime)
  • m.responseSendTimeHist.update(responseSendTime)
  • m.totalTimeHist.update(totalTime)
  • }
  • if (requestLogger.isTraceEnabled)
  • requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
  • .format(requestDesc(true), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
  • else if (requestLogger.isDebugEnabled)
  • requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
  • .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
  • }

查询GroupCoordinator的请求的ResponseQueueTimeMs度量如下

5.KafkaMetric中的Histogram度量-2.png

1.4. BrokerChangeListener中的Timer度量

在Kafka中使用Timer来统计Broker发生增减时,Leader副本选举的时长及其分布,更确切地说,是统计Controller Leader执行整个BrokerChangeListener监听器的handleChildChange(...)方法所用的时间及其分布。KafkaServer是服务端的入口,它启动时会调用registerStats(...)方法创建并注册Timer对象:

kafka.server.KafkaServer
  • class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
  • ...
  • def startup() {
  • ...
  • /** register broker metrics
  • * 创建并注册Broker的Timer Metrics对象
  • * */
  • registerStats()
  • ...
  • }
  • ...
  • private def registerStats() {
  • BrokerTopicStats.getBrokerAllTopicsStats()
  • ControllerStats.uncleanLeaderElectionRate
  • ControllerStats.leaderElectionTimer
  • }
  • ...
  • }

其中ControllerStats的源码如下:

kafka.controller.ControllerStats
  • object ControllerStats extends KafkaMetricsGroup {
  • // 注册Meter度量对象
  • private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
  • // 注册Timer度量对象
  • private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
  • // KafkaServer needs to initialize controller metrics during startup. We perform initialization
  • // through method calls to avoid Scala compiler warnings.
  • def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate
  • def leaderElectionTimer: KafkaTimer = _leaderElectionTimer
  • }

其中KafkaTimer是对Timer的简单封装,底层还是调用Timer的time()stop()方法实现统计功能。

BrokerChangeListener中对leaderElectionTimer的使用如下:

  • class BrokerChangeListener() extends IZkChildListener with Logging {
  • this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
  • def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
  • inLock(controllerContext.controllerLock) {
  • if (hasStarted.get) { // 需要在状态机启动的情况下执行
  • ControllerStats.leaderElectionTimer.time { // 计时操作,用于JMX监控
  • ...
  • }
  • ...
  • }
  • ...
  • }
  • ...
  • }

在监控界面,LeaderElectionRateAndTimeMs度量显示如下:

6.KafkaMetric中的Meter度量-1.png

UncleanLeaderElectionsPerSec度量显示如下:

7.KafkaMetric中的Meter度量-2.png

2. Kafka的监控

Kafka自身还提供了监控模块的实现,代码位于clients模块的org.apache.kafka.common.metrics包以及其子包中。

2.1. 基础规范类

其中,org.apache.kafka.common.metrics.Measurable接口是度量类型的基础接口,它内部只声明了measure(...)一个方法,源码如下:

  • /**
  • * A measurable quantity that can be registered as a metric
  • * 度量类型最基础的接口
  • */
  • public interface Measurable {
  • /**
  • * Measure this quantity and return the result as a double
  • * 用于获取被监控的值
  • *
  • * @param config The configuration for this metric
  • * @param now The POSIX time in milliseconds the measurement is being taken
  • * @return The measured value
  • */
  • public double measure(MetricConfig config, long now);
  • }

org.apache.kafka.common.metrics.Stat接口表示需要经过统计计算的度量类型,如平均值、极值等,它内部定义了record(...)方法用于记录某值并更新度量值:

  • /**
  • * A Stat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor
  • * 表示需要经过统计计算的度量类型,如平均值、最大值、最小值等
  • */
  • public interface Stat {
  • /**
  • * Record the given value
  • * 通过该方法记录某值并更新度量值
  • *
  • * @param config The configuration to use for this metric
  • * @param value The value to record
  • * @param timeMs The POSIX time in milliseconds this value occurred
  • */
  • public void record(MetricConfig config, double value, long timeMs);
  • }

org.apache.kafka.common.metrics.MeasurableStat接口继承了Measurable和Stat,没有声明任何其他的方法,是一个组合接口。

org.apache.kafka.common.metrics.stats.SampledStat是一个抽象类,它表示一个抽样的度量值,除了Total和Rate外的其他MeasurableStat接口实现都依赖它的功能。在SampledStat中可以有多个Sample(抽样,样本)并通过多个Sample完成对一个值的度量,在每个Sample中都记录了其对应的时间窗口和事件数量,SampledStat在计算最终的结果值时,可以根据这两个值决定是否使用此Sample中的数据。SampledStat的定义及重要字段如下:

org.apache.kafka.common.metrics.stats.SampledStat
  • /**
  • * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
  • * configurable window. The window can be defined by number of events or elapsed time (or both, if both are given the
  • * window is complete when <i>either</i> the event count or elapsed time criterion is met).
  • * <p>
  • * All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
  • * recycled to begin recording the next sample.
  • *
  • * Subclasses of this class define different statistics measured using this basic pattern.
  • *
  • * 表示一个抽样的度量值,除了Total和Rate外的其他MeasurableStat接口实现都依赖它的功能
  • */
  • public abstract class SampledStat implements MeasurableStat {
  • // 指定每个样本的初始值
  • private double initialValue;
  • // 当前使用的Sample的下标
  • private int current = 0;
  • // List类型,保存当前SampledStat中的多个Sample
  • protected List<Sample> samples;
  • // 构造方法
  • public SampledStat(double initialValue) {
  • this.initialValue = initialValue;
  • this.samples = new ArrayList<Sample>(2);
  • }
  • ...
  • }

其中samples字段是一个集合,默认实现为ArrayList,大小为2,保存了多个Sample对象;Sample是SampledStat的静态内部类,定义及重要字段如下:

org.apache.kafka.common.metrics.stats.SampledStat.Sample
  • protected static class Sample {
  • // 指定样本的初始值
  • public double initialValue;
  • // 记录当前样本的事件数
  • public long eventCount;
  • // 记录当前样本的时间窗口开始的时间戳
  • public long lastWindowMs;
  • // 记录样本的值
  • public double value;
  • public Sample(double initialValue, long now) {
  • this.initialValue = initialValue;
  • this.eventCount = 0;
  • this.lastWindowMs = now;
  • this.value = initialValue;
  • }
  • public void reset(long now) {
  • this.eventCount = 0;
  • this.lastWindowMs = now;
  • this.value = initialValue;
  • }
  • // 检测eventCount和lastWindows决定当前样本是否已经取样完成
  • public boolean isComplete(long timeMs, MetricConfig config) {
  • return timeMs - lastWindowMs >= config.timeWindowMs() || // 检测时间窗口
  • eventCount >= config.eventWindow(); // 检测事件数
  • }
  • }

另外SampledStat还使用到了MetricConfig类,它是一个配置类,定义和重要字段如下:

org.apache.kafka.common.metrics.MetricConfig
  • /**
  • * Configuration values for metrics
  • */
  • public class MetricConfig {
  • // 定义了度量值的上下限,超过范围后会抛出异常
  • private Quota quota;
  • // 指定样本的个数,默认值是2
  • private int samples;
  • // 指定每个样本中事件的上限,当一个样本中的事件数超过此值后,则开始下个样本的记录,默认值是Long.MAX_VALUE
  • private long eventWindow;
  • // 指定每个样本取样的时间窗口,当一个样本的时间窗口超出范围后,则开始下个样本的记录,默认值是30秒
  • private long timeWindowMs;
  • // 指定相关的Tag标签
  • private Map<String, String> tags;
  • ...
  • }

SampledStat实现了MeasurableStat接口中的两个方法;其中record(...)方法会根据时间窗口和事件数使用合适的Sample对象进行记录:

org.apache.kafka.common.metrics.stats.SampledStat
  • @Override
  • public void record(MetricConfig config, double value, long timeMs) {
  • // 得到当前的Sample对象
  • Sample sample = current(timeMs);
  • // 检测当前Sample是否已经完成取样
  • if (sample.isComplete(timeMs, config))
  • // 获取下一个Sample
  • sample = advance(config, timeMs);
  • // 更新Sample,该方法是抽象方法
  • update(sample, config, value, timeMs);
  • // 增加事件数
  • sample.eventCount += 1;
  • }
  • // 根据传入的时间确定Sample对象
  • public Sample current(long timeMs) {
  • if (samples.size() == 0)
  • // 如果samples集合中没有就新建一个
  • this.samples.add(newSample(timeMs));
  • // 根据current下标进行查找
  • return this.samples.get(this.current);
  • }
  • // 创建新的Sample对象
  • protected Sample newSample(long timeMs) {
  • // 根据初始值和时间创建Sample对象
  • return new Sample(this.initialValue, timeMs);
  • }
  • // 根据配置指定的Sample数量决定创建新Sample还是使用之前的Sample对象
  • private Sample advance(MetricConfig config, long timeMs) {
  • // current往前推进,根据配置的Sample总数取模避免越界
  • this.current = (this.current + 1) % config.samples();
  • if (this.current >= samples.size()) { // 索引大于samples数组大小
  • // 创建新的Sample对象,这里可能需要扩容
  • Sample sample = newSample(timeMs);
  • this.samples.add(sample);
  • // 返回新创建的Sample
  • return sample;
  • } else {
  • // 索引还在samples数组的下标范围内,直接返回
  • Sample sample = current(timeMs);
  • // 重用之前的Sample对象
  • sample.reset(timeMs);
  • return sample;
  • }
  • }

上面record(...)中调用的update(...)方法是抽象方法,交由子类实现。

measure(...)方法首先会将过期的Sample重置,然后调用combine(...)抽象方法完成计算:

org.apache.kafka.common.metrics.stats.SampledStat
  • // 会将过期的Sample重置,调用combine()方法完成计算,该方法是抽象方法
  • @Override
  • public double measure(MetricConfig config, long now) {
  • purgeObsoleteSamples(config, now);
  • return combine(this.samples, config, now);
  • }
  • /* Timeout any windows that have expired in the absence of any events */
  • protected void purgeObsoleteSamples(MetricConfig config, long now) {
  • // 计算过期时长
  • long expireAge = config.samples() * config.timeWindowMs();
  • for (int i = 0; i < samples.size(); i++) {
  • Sample sample = this.samples.get(i);
  • if (now - sample.lastWindowMs >= expireAge)
  • // 检测到Sample过期,将其重置
  • sample.reset(now);
  • }
  • }

2.2. Total实现类

Total实现了MeasurableStat接口,用于记录总数,源码比较简单,它的record(...)方法主要负责记录值的累加,measure(...)方法则直接将记录值返回:

org.apache.kafka.common.metrics.stats.Total
  • /**
  • * An un-windowed cumulative total maintained over all time.
  • * Total统计的值单调递增
  • */
  • public class Total implements MeasurableStat {
  • private double total;
  • public Total() {
  • this.total = 0.0;
  • }
  • public Total(double value) {
  • this.total = value;
  • }
  • // 在该方法中完成累加
  • @Override
  • public void record(MetricConfig config, double value, long now) {
  • this.total += value;
  • }
  • // 在该方法中返回累加值
  • @Override
  • public double measure(MetricConfig config, long now) {
  • return this.total;
  • }
  • }

2.3. Count实现类

Count类则继承了SampledStat抽象类,在update(...)方法中完成了Sample的value的累加,在combine(...)方法中将所有未过期的Sample的value求和并返回。Count与Total的不同之处在于,随着时间窗口的推移,Count中会有Sample过期,所以Count记录的是一段时间内的总数,而Total的值则单调递增:

org.apache.kafka.common.metrics.stats.Count
  • /**
  • * A {@link SampledStat} that maintains a simple count of what it has seen.
  • * Count与Total的不同之处在于,随着时间窗口的推移,Count中会有Sample过期,所以Count记录的是一段时间内的总数,而Total的值则单调递增。
  • */
  • public class Count extends SampledStat {
  • public Count() {
  • super(0);
  • }
  • // 在该方法中完成Sample的value累加
  • @Override
  • protected void update(Sample sample, MetricConfig config, double value, long now) {
  • sample.value += 1.0;
  • }
  • // 在该方法中将所有未过期的Sample的value进行求和并返回
  • @Override
  • public double combine(List<Sample> samples, MetricConfig config, long now) {
  • double total = 0.0;
  • for (int i = 0; i < samples.size(); i++)
  • total += samples.get(i).value;
  • return total;
  • }
  • }

2.4. Rate实现类

Rate实现了MeasurableStat接口,它用于记录比率,例如每秒钟创建连接的个数。Rate中封装了一个SampledStat类型的对象stat,默认是Rate的内部类SampledTotal的对象, SampledTotal的实现与Count类似。Rate的record(...)方法会调用其中的stat字段的record(...)方法进行记录;主要源码如下:

org.apache.kafka.common.metrics.stats.Rate
  • /**
  • * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
  • * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
  • * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
  • * values.
  • *
  • * 用于记录比率,例如每秒钟创建连接的个数
  • */
  • public class Rate implements MeasurableStat {
  • private final TimeUnit unit;
  • private final SampledStat stat;
  • ...
  • // 调用stat的record()方法进行记录
  • @Override
  • public void record(MetricConfig config, double value, long timeMs) {
  • this.stat.record(config, value, timeMs);
  • }
  • ...
  • // 获取stat字段的记录值并进行相应的计算得到最终比率
  • @Override
  • public double measure(MetricConfig config, long now) {
  • // 获取stat的值
  • double value = stat.measure(config, now);
  • /**
  • * 记录值除以总时间,得到最终比率
  • * convert()方法是根据Rate设置的时间单位,对总时间进行转换
  • */
  • return value / convert(windowSize(config, now));
  • }
  • private double convert(long timeMs) {
  • switch (unit) {
  • case NANOSECONDS:
  • return timeMs * 1000.0 * 1000.0;
  • case MICROSECONDS:
  • return timeMs * 1000.0;
  • case MILLISECONDS:
  • return timeMs;
  • case SECONDS:
  • return timeMs / 1000.0;
  • case MINUTES:
  • return timeMs / (60.0 * 1000.0);
  • case HOURS:
  • return timeMs / (60.0 * 60.0 * 1000.0);
  • case DAYS:
  • return timeMs / (24.0 * 60.0 * 60.0 * 1000.0);
  • default:
  • throw new IllegalStateException("Unknown unit: " + unit);
  • }
  • }
  • public long windowSize(MetricConfig config, long now) {
  • // purge old samples before we compute the window size
  • // 重置所有过期的Sample,它们不参与总时间的计算
  • stat.purgeObsoleteSamples(config, now);
  • /*
  • * Here we check the total amount of time elapsed since the oldest non-obsolete window.
  • * This give the total windowSize of the batch which is the time used for Rate computation.
  • * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
  • * window, the measured rate will be very high.
  • * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
  • *
  • * Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
  • * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
  • * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
  • *
  • * 通过stat的oldest()方法获取最老的Sample对象,从而计算总时间
  • */
  • long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
  • // Check how many full windows of data we have currently retained
  • // 有多少个完整的时间窗口,极有多少个完成的Sample
  • int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
  • // 计算最小要求的完整窗口
  • int minFullWindows = config.samples() - 1;
  • // If the available windows are less than the minimum required, add the difference to the totalElapsedTime
  • // 对总时间进行补偿
  • if (numFullWindows < minFullWindows)
  • totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
  • return totalElapsedTimeMs;
  • }
  • ...

2.5. Max实现类

Max实现类继承了SampledStat抽象类,用于记录最大值;它的update(...)方法会较新的值和旧的值的大小,并记录最大的那个值,combine(...)方法则用于返回所有Sample中记录的最大值:

org.apache.kafka.common.metrics.stats.Max
  • /**
  • * A {@link SampledStat} that gives the max over its samples.
  • *
  • * 记录最大值
  • */
  • public final class Max extends SampledStat {
  • public Max() {
  • super(Double.NEGATIVE_INFINITY);
  • }
  • // 每个比较新的值和旧的值的大小,记录最大的那个
  • @Override
  • protected void update(Sample sample, MetricConfig config, double value, long now) {
  • sample.value = Math.max(sample.value, value);
  • }
  • @Override
  • public double combine(List<Sample> samples, MetricConfig config, long now) {
  • double max = Double.NEGATIVE_INFINITY;
  • // 得到所有Sample中的最大值
  • for (int i = 0; i < samples.size(); i++)
  • max = Math.max(max, samples.get(i).value);
  • return max;
  • }
  • }

2.6. Min实现类

Mix与Max类恰好相反,用于记录最小值,实现方式与Max类似:

org.apache.kafka.common.metrics.stats.Min
  • /**
  • * A {@link SampledStat} that gives the min over its samples.
  • */
  • public class Min extends SampledStat {
  • public Min() {
  • super(Double.MIN_VALUE);
  • }
  • // 每个比较新的值和旧的值的大小,记录最小的那个
  • @Override
  • protected void update(Sample sample, MetricConfig config, double value, long now) {
  • sample.value = Math.min(sample.value, value);
  • }
  • @Override
  • public double combine(List<Sample> samples, MetricConfig config, long now) {
  • double max = Double.MAX_VALUE;
  • // 得到所有Sample中的最小值
  • for (int i = 0; i < samples.size(); i++)
  • max = Math.min(max, samples.get(i).value);
  • return max;
  • }
  • }

2.7. Avg实现类

Avg实现类继承了SampledStat抽象类,用于记录平均值;它的update(...)方法会对每次传入的Sample的值进行累加,combine(...)方法则会统计Sample的数量,然后根据update(...)记录的累加值计算平均值:

org.apache.kafka.common.metrics.stats.Avg
  • /**
  • * A {@link SampledStat} that maintains a simple average over its samples.
  • */
  • public class Avg extends SampledStat {
  • public Avg() {
  • super(0.0);
  • }
  • // 对每次新的值进行累加
  • @Override
  • protected void update(Sample sample, MetricConfig config, double value, long now) {
  • sample.value += value;
  • }
  • @Override
  • public double combine(List<Sample> samples, MetricConfig config, long now) {
  • double total = 0.0;
  • long count = 0;
  • // 将所有Sample的值进行累加,并将所有Sample的事件数进行累加
  • for (int i = 0; i < samples.size(); i++) {
  • Sample s = samples.get(i);
  • total += s.value;
  • count += s.eventCount;
  • }
  • // 计算平均值
  • return count == 0 ? 0 : total / count;
  • }
  • }

2.8. Percentiles实现类

2.9. Kafka的封装

Kafka中通过KafkaMetrics对上述度量类型做了又一次封装,在KafkaMetrics中还封装了MetricName对象和MetricsConfig对象。

我们先来关注MetricName类的定义和重要字段:

org.apache.kafka.common.MetricName
  • public final class MetricName {
  • // KafkaMetric的名称
  • private final String name;
  • // 记录所属的逻辑组的名称
  • private final String group;
  • // 对KafkaMetric的描述信息
  • private final String description;
  • // 记录额外的键值对信息
  • private Map<String, String> tags;
  • ...
  • }

在实际应用中,对同一个操作需要有多个不同方面的度量,例如,我们需要监控请求的最大长度,也需要监控请求的平均值等。为了满足这种需求,Kafka将多个相关的度量对象封装进Sensor中。Sensor中的核心字段如下所述:

org.apache.kafka.common.metrics.Sensor
  • public final class Sensor {
  • private final Metrics registry;
  • // 当前Sensor对象的名称,Metrics通过该名称区别不同的Sensor对象
  • private final String name;
  • // Sensor是可以分为多层的,该字段指定了当前Sensor对象的父Sensor
  • private final Sensor[] parents;
  • // 保存构成Sensor的度量对象
  • private final List<Stat> stats;
  • /**
  • * 保存构成Sensor的KafkaMetric对象,
  • * 当度量对象添加进Sensor时会创建对应的KafkaMetric对象(CompoundStat会创建多个),
  • * 并保存到此集合中
  • */
  • private final List<KafkaMetric> metrics;
  • // 默认的配置信息
  • private final MetricConfig config;
  • private final Time time;
  • // 最后一次执行record()方法的时间戳
  • private volatile long lastRecordTime;
  • /**
  • * 长时间未使用Sensor会被认为是过期Sensor,
  • * 由ExpireSensorTask线程负责进行清理,此字段记录成为过期Sensor”的阈值
  • */
  • private final long inactiveSensorExpirationTimeMs;
  • ...
  • }

其中stats集合用于装载多个Stat度量对象;Sensor的add(...)方法用于向Sensor中添加Stat:

org.apache.kafka.common.metrics.Sensor#add
  • public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
  • // 创建KafkaMetric对象
  • KafkaMetric metric = new KafkaMetric(new Object(),
  • Utils.notNull(metricName),
  • Utils.notNull(stat),
  • config == null ? this.config : config,
  • time);
  • // 将KafkaMetric保存到Metrics中,创建并注册对应的MBean
  • this.registry.registerMetric(metric);
  • // 添加到metrics集合
  • this.metrics.add(metric);
  • // 添加到stats集合
  • this.stats.add(stat);
  • }
  • public synchronized void add(CompoundStat stat, MetricConfig config) {
  • this.stats.add(Utils.notNull(stat));
  • // 遍历CompoundStat中每个子Stat对象
  • for (NamedMeasurable m : stat.stats()) {
  • // 创建KafkaMetric对象
  • KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
  • // 将KafkaMetric保存到Metrics中,创建并注册对应的MBean
  • this.registry.registerMetric(metric);
  • // 添加到metrics集合
  • this.metrics.add(metric);
  • }
  • }

从源码可知,Sensor的add(...)方法会将Stat对象包装为KafkaMetric对象,然后创建对应的MBean进行注册,最后将KafkaMetric对象保存在metricsstats集合中。

Sensor的record(...)方法中完成了记录功能。它会更新lastRecordTime字段,调用stats集合中所有Stat对象以及parents集合中所有父Sensor的record(...)方法:

  • /**
  • * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
  • * the time stamp.
  • * @param value The value we are recording
  • * @param timeMs The current POSIX time in milliseconds
  • * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
  • * bound
  • */
  • public void record(double value, long timeMs) {
  • this.lastRecordTime = timeMs;
  • synchronized (this) {
  • // increment all the stats
  • // 调用stats集合中每个stat对象的record()方法
  • for (int i = 0; i < this.stats.size(); i++)
  • this.stats.get(i).record(config, value, timeMs);
  • // 检测是否超出了MetricConfig指定的上下西限
  • checkQuotas(timeMs);
  • }
  • for (int i = 0; i < parents.length; i++)
  • // 调用每个父Sensor的record()方法
  • parents[i].record(value, timeMs);
  • }
  • /**
  • * Check if we have violated our quota for any metric that has a configured quota
  • * @param timeMs
  • */
  • private void checkQuotas(long timeMs) {
  • for (int i = 0; i < this.metrics.size(); i++) {
  • KafkaMetric metric = this.metrics.get(i);
  • // 获取MetricConfig对象
  • MetricConfig config = metric.config();
  • if (config != null) {
  • // 获取MetricsConfig对象中的Quota对象
  • Quota quota = config.quota();
  • if (quota != null) {
  • // 计算最终的度量值
  • double value = metric.value(timeMs);
  • // 检测度量值是否超出上下限
  • if (!quota.acceptable(value)) {
  • throw new QuotaViolationException(String.format(
  • "'%s' violated quota. Actual: %f, Threshold: %f",
  • metric.metricName(),
  • value,
  • quota.bound()));
  • }
  • }
  • }
  • }
  • }

Kafka的Metrics类负责统一管理Sensor对象、KafkaMetric对象以及MetricsReporter对象,定义如下:

org.apache.kafka.common.metrics.Metrics
  • public class Metrics implements Closeable {
  • // 默认的配置信息
  • private final MetricConfig config;
  • // 保存了添加到Metrics中的KafkaMetric对象
  • private final ConcurrentMap<MetricName, KafkaMetric> metrics;
  • // 保存了添加到Metrics中的Sensor对象
  • private final ConcurrentMap<String, Sensor> sensors;
  • // 记录了每个Sensor的子Sensor集合
  • private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors;
  • // 保存了使用的MetricsReporter对象,默认是JmxReporter
  • private final List<MetricsReporter> reporters;
  • private final Time time;
  • // 用于执行ExpireSensorTask定时任务的线程池
  • private final ScheduledThreadPoolExecutor metricsScheduler;
  • ...
  • }

Metrics的sensor(...)方法主要负责从sensors集合中获取Sensor对象,如果指定的Sensor不存在则创建新Sensor对象,并使用childrenSensors集合记录Sensor的层级关系:

org.apache.kafka.common.metrics.Metrics#sensor
  • public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor... parents) {
  • // 根据name从sensors集合中获取Sensor对象
  • Sensor s = getSensor(name);
  • if (s == null) {
  • // 创建Sensor对象
  • s = new Sensor(this, name, parents, config == null ? this.config : config, time, inactiveSensorExpirationTimeSeconds);
  • // 将新创建的Sensor对象保存到sensors集合中
  • this.sensors.put(name, s);
  • if (parents != null) {
  • // 通过childrenSensors记录Sensor的层次关系
  • for (Sensor parent : parents) {
  • List<Sensor> children = childrenSensors.get(parent);
  • if (children == null) {
  • children = new ArrayList<>();
  • childrenSensors.put(parent, children);
  • }
  • children.add(s);
  • }
  • }
  • log.debug("Added sensor with name {}", name);
  • }
  • return s;
  • }

Metrics的addReporter(...)方法负责添加MetricsReporter:

org.apache.kafka.common.metrics.Metrics#addReporter
  • public synchronized void addReporter(MetricsReporter reporter) {
  • Utils.notNull(reporter).init(new ArrayList<>(metrics.values()));
  • this.reporters.add(reporter);
  • }

而在通过addMetric(...)方法添加KafkaMetric对象时会调用registerMetric(...)方法向每个MetricReporter进行注册:

org.apache.kafka.common.metrics.Metrics
  • public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
  • KafkaMetric m = new KafkaMetric(new Object(),
  • Utils.notNull(metricName),
  • Utils.notNull(measurable),
  • config == null ? this.config : config,
  • time);
  • registerMetric(m);
  • }
  • synchronized void registerMetric(KafkaMetric metric) {
  • MetricName metricName = metric.metricName();
  • if (this.metrics.containsKey(metricName))
  • throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
  • // 向metrics中添加KafkaMetric对象
  • this.metrics.put(metricName, metric);
  • for (MetricsReporter reporter : reporters)
  • // 向每个MetricsReporter注册KafkaMetric对象
  • reporter.metricChange(metric);
  • }

在Metrics的构造方法中除了初始化其字段,还会通过metricsScheduler启动一个ExpireSensorTask定时任务处理“过期Sensor”,创建一个用于记录metrics集合大小的匿名度量类的对象:

org.apache.kafka.common.metrics.Metrics#Metrics
  • public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
  • this.config = defaultConfig;
  • this.sensors = new ConcurrentHashMap<>();
  • this.metrics = new ConcurrentHashMap<>();
  • this.childrenSensors = new ConcurrentHashMap<>();
  • this.reporters = Utils.notNull(reporters);
  • this.time = time;
  • for (MetricsReporter reporter : reporters)
  • reporter.init(new ArrayList<KafkaMetric>());
  • // Create the ThreadPoolExecutor only if expiration of Sensors is enabled.
  • // 创建ScheduledThreadPoolExecutor线程池
  • if (enableExpiration) {
  • this.metricsScheduler = new ScheduledThreadPoolExecutor(1);
  • // Creating a daemon thread to not block shutdown
  • this.metricsScheduler.setThreadFactory(new ThreadFactory() {
  • public Thread newThread(Runnable runnable) {
  • return Utils.newThread("SensorExpiryThread", runnable, true);
  • }
  • });
  • // 启动定时任务
  • this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS);
  • } else {
  • this.metricsScheduler = null;
  • }
  • addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"),
  • // 创建一个记录metrics集合大小的Measurable对象并注册到Metrics中
  • new Measurable() {
  • @Override
  • public double measure(MetricConfig config, long now) {
  • return metrics.size();
  • }
  • });
  • }

ExpireSensorTask任务中会遍历sensors集合,并将过期的Sensor对象删除:

org.apache.kafka.common.metrics.Metrics.ExpireSensorTask
  • class ExpireSensorTask implements Runnable {
  • public void run() {
  • // 遍历sensors集合
  • for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
  • // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
  • // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
  • // Calling record on C would cause a record on P as well.
  • // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
  • // that will cause C to also get removed.
  • // Since the expiration time is typically high it is not expected to be a significant concern
  • // and thus not necessary to optimize
  • synchronized (sensorEntry.getValue()) {
  • if (sensorEntry.getValue().hasExpired()) { // 检查Sensor是否过期
  • log.debug("Removing expired sensor {}", sensorEntry.getKey());
  • // 移除过期的Sensor对象
  • removeSensor(sensorEntry.getKey());
  • }
  • }
  • }
  • }
  • }

在Metrics的reporters集合中可以保存多个MetricsReporter对象,当添加KafkaMetrics时会调用MetricReporter的metricChange(...)方法进行注册,MetricReporter会按照其实现的方式将KafkaMetrics中的度量信息暴露出来,例如JmxReporter实现会创建并注册相应的MBean。MetricsReporter接口中定义了如下方法:

org.apache.kafka.common.metrics.MetricsReporter
  • public interface MetricsReporter extends Configurable {
  • /**
  • * This is called when the reporter is first registered to initially register all existing metrics
  • * 初始化函数,将MetricsReporter添加到Metrics时被调用,完成初始化操作
  • * @param metrics All currently existing metrics
  • */
  • public void init(List<KafkaMetric> metrics);
  • /**
  • * This is called whenever a metric is updated or added
  • * 向Metrics中添加KafkaMetric时被调用
  • * @param metric
  • */
  • public void metricChange(KafkaMetric metric);
  • /**
  • * This is called whenever a metric is removed
  • * 从Metrics中删除KafkaMetric时被调用
  • * @param metric
  • */
  • public void metricRemoval(KafkaMetric metric);
  • /**
  • * Called when the metrics repository is closed.
  • * 关闭操作
  • */
  • public void close();
  • }

JmxReporter是Kafka提供的MetricsReporter接口默认实现类,它通过JMX的方式将KafkaMetric中的信息暴露出来,它使用Map类型的mbeans集合记录了MBean的名称和对应的KafkaMBean对象。KafkaMBean是一个DynamicMBean接口的实现,其中通过Map类型的metrics集合记录了添加的KafkaMetric对象,这些KafkaMetric对象会被当作KafkaMbean对象的属性处理:

org.apache.kafka.common.metrics.JmxReporter.KafkaMbean
  • private static class KafkaMbean implements DynamicMBean {
  • // MBean的名称
  • private final ObjectName objectName;
  • // 记录了添加的KafkaMetric对象,这些KafkaMetric对象会被当作KafkaMbean对象的属性处理
  • private final Map<String, KafkaMetric> metrics;
  • public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
  • // 初始化metrics和objectName
  • this.metrics = new HashMap<String, KafkaMetric>();
  • this.objectName = new ObjectName(mbeanName);
  • }
  • public ObjectName name() {
  • return objectName;
  • }
  • public void setAttribute(String name, KafkaMetric metric) {
  • // 记录KafkaMetric对象
  • this.metrics.put(name, metric);
  • }
  • @Override
  • public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
  • if (this.metrics.containsKey(name))
  • // 返回指定KafkaMetric中的度量值
  • return this.metrics.get(name).value();
  • else
  • throw new AttributeNotFoundException("Could not find attribute " + name);
  • }
  • @Override
  • public AttributeList getAttributes(String[] names) {
  • try {
  • AttributeList list = new AttributeList();
  • // 循环遍历names并调用getAttribute()方法
  • for (String name : names)
  • // 获取指定KafkaMetric中的度量值包装为Attribute对象并放入AttributeList
  • list.add(new Attribute(name, getAttribute(name)));
  • return list;
  • } catch (Exception e) {
  • log.error("Error getting JMX attribute: ", e);
  • return new AttributeList();
  • }
  • }
  • ...
  • }

JmxReporter的metricChange(...)方法会将KafkaMetric以属性的形式添加到KafkaMBean中,并重新注册KafkaMBean:

org.apache.kafka.common.metrics.JmxReporter
  • // 将KafkaMetric以属性的形式添加到KafkaMBean中,并重新注册KafkaMBean
  • @Override
  • public void metricChange(KafkaMetric metric) {
  • synchronized (LOCK) {
  • // 添加KafkaMetric
  • KafkaMbean mbean = addAttribute(metric);
  • // 重新注册KafkaMBean
  • reregister(mbean);
  • }
  • }
  • private KafkaMbean addAttribute(KafkaMetric metric) {
  • try {
  • MetricName metricName = metric.metricName();
  • // 获取KafkaMBean的名称
  • String mBeanName = getMBeanName(metricName);
  • // 如果没有该名称的KafkaMBean则创建
  • if (!this.mbeans.containsKey(mBeanName))
  • mbeans.put(mBeanName, new KafkaMbean(mBeanName));
  • KafkaMbean mbean = this.mbeans.get(mBeanName);
  • // 以属性的形式添加KafkaMetric
  • mbean.setAttribute(metricName.name(), metric);
  • return mbean;
  • } catch (JMException e) {
  • throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
  • }
  • }
  • private void reregister(KafkaMbean mbean) {
  • // 先取消KafkaMBean的注册
  • unregister(mbean);
  • try {
  • // 调用MBeanServer的registerMBean()方法重新注册KafkaMBean
  • ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
  • } catch (JMException e) {
  • throw new KafkaException("Error registering mbean " + mbean.name(), e);
  • }
  • }

JmxReporter的getMBeanName(...)方法生成的KafkaMBean名称的格式是prefix:type=[grouop],key1=value1,key2=value2...

org.apache.kafka.common.metrics.JmxReporter#getMBeanName
  • private String getMBeanName(MetricName metricName) {
  • StringBuilder mBeanName = new StringBuilder();
  • mBeanName.append(prefix); // 第一部分是前缀,服务端默认是kafka.server
  • mBeanName.append(":type="); // 第二部分是MetricName中group字段
  • mBeanName.append(metricName.group());
  • // 第三部分由MetricName中的tags集合构成
  • for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
  • if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
  • continue;
  • mBeanName.append(",");
  • mBeanName.append(entry.getKey());
  • mBeanName.append("=");
  • mBeanName.append(entry.getValue());
  • }
  • return mBeanName.toString();
  • }

3. 监控Selector指标

在前面分析KafkaController时知道,在Controller成为Leader时会创建ControllerChannelManager,它负责管理Controller Leader与其他Broker之间的连接。涉及网络连接的问题都会与NetworkClient和Selector相关,本节就以ControllerChannelManager中使用的Selector为例,分析Kafka监控模块的使用。

在ControllerChannelManager的addNewBroker(...)方法中会给每个新增的Broker创建对应的NetworkClient对象:

kafka.controller.ControllerChannelManager#addNewBroker
  • // 实现对brokerStateInfo集合的管理
  • private def addNewBroker(broker: Broker) {
  • ...
  • // 构建网络通信组件
  • val networkClient = {
  • // 创建ChannelBuilder对象
  • val channelBuilder = ChannelBuilders.create(
  • config.interBrokerSecurityProtocol,
  • Mode.CLIENT,
  • LoginType.SERVER,
  • config.values,
  • config.saslMechanismInterBrokerProtocol,
  • config.saslInterBrokerHandshakeRequestEnable
  • )
  • // 创建Selector对象
  • val selector = new Selector(
  • NetworkReceive.UNLIMITED,
  • config.connectionsMaxIdleMs,
  • metrics, // Metrics对象,由KafkaServer启动时创建
  • time,
  • "controller-channel",
  • Map("broker-id" -> broker.id.toString).asJava,
  • false,
  • channelBuilder
  • )
  • // 创建NetworkClient对象
  • new NetworkClient(
  • selector,
  • new ManualMetadataUpdater(Seq(brokerNode).asJava),
  • config.brokerId.toString,
  • 1,
  • 0,
  • Selectable.USE_DEFAULT_BUFFER_SIZE,
  • Selectable.USE_DEFAULT_BUFFER_SIZE,
  • config.requestTimeoutMs,
  • time
  • )
  • }
  • ...
  • }

在该方法构建Selector对象时,传入了Metrics类型的参数metrics,该字段在KafkaServer启动时创建:

kafka.server.KafkaServer
  • class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
  • ...
  • // 用于构成MBean名称的一部分
  • private val jmxPrefix: String = "kafka.server"
  • // 读取配置的MetricReporter类型,metric.reporters,默认为空
  • private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
  • // 默认会创建并添加JmxReporter对象
  • reporters.add(new JmxReporter(jmxPrefix))
  • ...
  • // Metrics监控
  • var metrics: Metrics = null
  • // 创建MetricConfig对象
  • private val metricConfig: MetricConfig = new MetricConfig()
  • // 设置Sample个数,默认为2,metrics.num.samples
  • .samples(config.metricNumSamples)
  • // 设置Sample的时间,默认为30000,即300秒,metrics.sample.window.ms
  • .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
  • ...
  • def startup() {
  • ...
  • metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
  • ...
  • }
  • ...
  • }

在Selector中,定义了SelectorMetrics私有内部类,封装了多个Sensor对象:

org.apache.kafka.common.network.Selector.SelectorMetrics
  • private class SelectorMetrics {
  • private final Metrics metrics;
  • // 监控连接关闭,使用Rate记录每秒连接关闭数
  • public final Sensor connectionClosed;
  • // 监控连接创建,使用Rate记录每秒连接创建数
  • public final Sensor connectionCreated;
  • // 监控网络操作数,使用Rate记录每秒钟所有连接上执行的读写操作总数
  • public final Sensor bytesTransferred;
  • /**
  • * 监控发送请求的相关指标,此Sensor是bytesTransferred的子Sensor。
  • * 其中使用Rate记录Controller每秒钟发送的字节数和请求数,
  • * 使用Avg记录发送请求的平均大小,使用Max记录发送请求的最大长度。
  • */
  • public final Sensor bytesSent;
  • /**
  • * 监控接收请求的相关指标,此Sensor是byteTransferred的子Sensor。
  • * 使用Rate记录Controller每秒钟接收的字节数和请求数。
  • */
  • public final Sensor bytesReceived;
  • /**
  • * 监控select()方法的相关指标。
  • * 使用Rate记录每秒钟调用select()方法的次数,
  • * 使用Avg记录调用select()方法阻塞的平均值,
  • * 使用Rate记录调用select()方法阻塞时间占总时间的比例。
  • */
  • public final Sensor selectTime;
  • /**
  • * 监控I/O耗时的相关指标。
  • * 使用Avg记录I/O操作的平均耗时,使用Rate执行I/O操作占总时间的比例。
  • */
  • public final Sensor ioTime;
  • /**
  • * Names of metrics that are not registered through sensors
  • * 保存了直接向Metrics注册的Measurable对象,这些Measurable对象没有注册到Sensor中。
  • * */
  • private final List<MetricName> topLevelMetricNames = new ArrayList<>();
  • // 保存了上面全部的Sensor对象
  • private final List<Sensor> sensors = new ArrayList<>();
  • ...
  • }

而在SelectorMetrics的构造方法中会创建上述的Sensor对象,并按照上面描述的功能添加MeasurableStat:

org.apache.kafka.common.network.Selector.SelectorMetrics#SelectorMetrics
  • public SelectorMetrics(Metrics metrics) {
  • this.metrics = metrics;
  • // 此处得到的metricGrpName的值为controller-channel-metrics,下面所有Sensor都是用此值作为group部分
  • String metricGrpName = metricGrpPrefix + "-metrics";
  • StringBuilder tagsSuffix = new StringBuilder();
  • /**
  • * 将tags集合组装成字符串,假设连接的BrokerId为1,此值为broker-1,
  • * 下面所有Sensor都会将此值作为其name的一部分
  • */
  • for (Map.Entry<String, String> tag: metricTags.entrySet()) {
  • tagsSuffix.append(tag.getKey());
  • tagsSuffix.append("-");
  • tagsSuffix.append(tag.getValue());
  • }
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象
  • this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
  • // 创建MetricName
  • MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
  • // 记录每秒连接的关闭数
  • this.connectionClosed.add(metricName, new Rate());
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象
  • this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
  • // 记录每秒连接的创建数
  • metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
  • this.connectionCreated.add(metricName, new Rate());
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象
  • this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
  • // 记录所有连接每秒执行的读写总数
  • metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
  • bytesTransferred.add(metricName, new Rate(new Count()));
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象,这里会将bytesTransferred作为付Sensor
  • this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
  • // 记录所有连接每秒发送的总字节数
  • metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
  • this.bytesSent.add(metricName, new Rate());
  • // 记录平均每秒发送的总请求数
  • metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
  • this.bytesSent.add(metricName, new Rate(new Count()));
  • // 记录请求的平均大小
  • metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
  • this.bytesSent.add(metricName, new Avg());
  • // 记录请求的最大长度
  • metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
  • this.bytesSent.add(metricName, new Max());
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象,这里会将bytesTransferred作为付Sensor
  • this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
  • // 记录每秒收到的字节数
  • metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
  • this.bytesReceived.add(metricName, new Rate());
  • // 记录每秒收到的请求数
  • metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
  • this.bytesReceived.add(metricName, new Rate(new Count()));
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象
  • this.selectTime = sensor("select-time:" + tagsSuffix.toString());
  • // 记录每秒调用select()方法次数
  • metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
  • this.selectTime.add(metricName, new Rate(new Count()));
  • // 记录select()方法的平均阻塞时间
  • metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
  • this.selectTime.add(metricName, new Avg());
  • // 记录调用select()方法阻塞时间占总时间的比例
  • metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
  • this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
  • // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  • // 创建Sensor对象
  • this.ioTime = sensor("io-time:" + tagsSuffix.toString());
  • // 记录I/O的平均时长
  • metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
  • this.ioTime.add(metricName, new Avg());
  • // 记录执行I/O时间占总时间的比例
  • metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
  • this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
  • // 记录连接数
  • metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
  • topLevelMetricNames.add(metricName);
  • // 直接向Metrics添加匿名的Measurale,用来记录连接数
  • this.metrics.addMetric(metricName, new Measurable() {
  • public double measure(MetricConfig config, long now) {
  • return channels.size();
  • }
  • });
  • }
  1. 在Selector的close()方法中会关闭指定的连接,同时会调用connectionClosed度量对象的record(...)方法进行记录:
org.apache.kafka.common.network.Selector#close
  • /**
  • * Begin closing this connection
  • */
  • private void close(KafkaChannel channel) {
  • try {
  • // 关闭连接
  • channel.close();
  • } catch (IOException e) {
  • log.error("Exception closing connection to node {}:", channel.id(), e);
  • }
  • this.stagedReceives.remove(channel);
  • this.channels.remove(channel.id());
  • this.lruConnections.remove(channel.id());
  • // 调用connectionClosed的record(),记录连接关闭数
  • this.sensors.connectionClosed.record();
  • }
  1. 在Selector的poll()方法中会通过调用Selector的select()方法监听是否有连接触发其关注的事件,其中会计算select()方法的起止时间并调用selectTime字段的record(...)方法进行记录。select()方法返回后会调用Selector的pollSelectionKeys()方法进行I/O操作,处理select()方法得到的连接,其中会记录I/O操作的起止时间,并调用ioTime字段的record()方法记录I/O操作的时间:
org.apache.kafka.common.network.Selector#poll
  • @Override
  • public void poll(long timeout) throws IOException {
  • ...
  • /* check ready keys */
  • // 记录select()方法的起始时间
  • long startSelect = time.nanoseconds();
  • // 调用select()方法,等待事件发生
  • int readyKeys = select(timeout);
  • // 记录select()方法的结束时间
  • long endSelect = time.nanoseconds();
  • currentTimeNanos = endSelect;
  • // 调用selectTime的record()方法,记录select()方法的阻塞时间
  • this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
  • // 调用pollSelectionKeys()方法进行I/O操作处理
  • if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
  • // 处理IO事件
  • pollSelectionKeys(this.nioSelector.selectedKeys(), false);
  • pollSelectionKeys(immediatelyConnectedKeys, true);
  • }
  • // 将statedReceives复制到completedReceives集合中
  • addToCompletedReceives();
  • // 记录I/O操作的结束时间
  • long endIo = time.nanoseconds();
  • // 调用ioTime的record()方法记录I/O操作的耗时
  • this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
  • ...
  • }
  1. 当检测到连接创建成功时会调用connectionCreated字段的record(...)方法进行记录;检测到连接可写时,会向连接写入请求,当写入一个完整的请求后,会将请求放入completedSends集合等待后续处理,同时调用bytesSent字段的record(...)方法进行记录。
org.apache.kafka.common.network.Selector#pollSelectionKeys
  • // 处理IO操作
  • private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
  • ...
  • /* complete any connections that have finished their handshake (either normally or immediately) */
  • // connect()方法返回true或OP_CONNECTION事件的处理
  • if (isImmediatelyConnected || key.isConnectable()) {
  • // finishConnect方法会先检测sockChannel是否建立完成,建立后,会取消对OP_CONNECT事件关注,开始关注OP_READ事件
  • if (channel.finishConnect()) { // 检测是否已经成功连接
  • // 将连接的ID添加到connected集合中
  • this.connected.add(channel.id());
  • // 调用connectionCreated的record()方法,记录连接创建数
  • this.sensors.connectionCreated.record();
  • } else
  • // 连接未完成,跳过对该Channel的后续处理
  • continue;
  • }
  • ...
  • /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
  • if (channel.ready() && key.isWritable()) {
  • // Channel可写,处理OP_WRITE事件
  • Send send = channel.write();
  • /**
  • * channel.write()将KafkaChannel中保存的send字段发送出去,
  • * 如果发送成功就会返回send,然后将其添加到completedSends集合,等待后续处理
  • * 如果发送未完成会返回null
  • */
  • if (send != null) { // 成功发送一个完整的请求
  • // 添加到completedSends集合等待后续处理
  • this.completedSends.add(send);
  • // 调用bytesSent的record()方法进行记录
  • this.sensors.recordBytesSent(channel.id(), send.size());
  • }
  • }
  • ...
  • }
  1. 当从连接中读取到完整的请求后会放入stagedReceives集合中暂存, pollSelectionKeys(...)方法结束后会转移到completedReceives集合等待后续处理,同时会调用bytesReceived字段的record(...)方法进行记录。
org.apache.kafka.common.network.Selector#addToCompletedReceives
  • /**
  • * checks if there are any staged receives and adds to completedReceives
  • */
  • private void addToCompletedReceives() {
  • ...
  • this.completedReceives.add(networkReceive);
  • // 调用bytesReceived的record()方法进行记录
  • this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
  • ...
  • }

注意,bytesSentbytesReceived这两个Sensor是bytesTransferred的子Sensor,当调用子Sensor的record(...)方法时会同时调用父Sensor的record(...)方法。

在MBean监控页面,我们可以看到下面的显示:

8.KafkaSelector相关监控-1.png

对应部门Metadata信息如下:

9.KafkaSelector相关监控-2.png

注:在实际生产中,除了使用JConsole,还有Mx4jLoader、Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor等可视化的工具可供选择。