注:本文主要参考《Apache Kafka源码剖析》的第4章4.9节。
1. Kafka的Metrics
Kafka在Metrics的基础上进行了一次封装。在KafkaMetricsGroup中定义了一些基础方法:
其中,metricName(...)
方法按照一定的规则生成度量对象的MetricName;以“new”开头的方法用于创建对应类型的度量对象,removeMetric(...)
方法则用于移除指定的度量对象。
1.1. Log类中的Gauge度量
在日志存储相关的Log类中,使用Gauge度量类对当前Log中的LogSegment数量进行了统计:
根据metricName(...)
方法的特性,可以得到这里产生的MBean名称为kafka.log:type=Log,name=NumLogSegments,topic=[Topic],partition=[partition]
,在MBean监控的显示如下:
1.2. ReplicaManager类中的Meter度量
在ReplicaManager中使用Meter度量统计全部分区的ISR集合发生扩张或缩小的频率:
当发生ISR集合扩张或缩减时,在Partition类的maybeExpandIsr(...)
和maybeShrinkIsr(...)
两个方法中会分别对这两个度量值进行记录:
对应于MBean监控的显示如下:
isrExpandRate
的监控:
isrShrinkRate
的监控:
1.3. Request类中的Histogram度量
在RequestChannel.Request类中使用了Histogram统计各类请求和响应在RequestChannel中等待时间的分布。如果有大量请求在RequestChannel中等待的时间过长,则需要进行调优,例如Handler线程配置过少,Kafka上下游的服务出现请求洪泛等都会导致问题。观察响应在RequestChannel中的等待时间也会得到一些类似的结论,相关的代码片段如下:
当在Processor线程中成功接收到一个请求时会创建RequestChannel.Request对象,并将startTimeMs
初始化为当前时间,将requestDequeueTimeMs
字段初始化为-1:
之后将请求放入RequestChannel中等待Handler线程处理;当Handler线程从RequestChannel中取出Request对象时会更新其requestDequeueTimeMs
字段为当前时间戳,再将请求交给KafkaApis处理:
对于查询GroupCoordinator的请求的RequestQueueTimeMs度量如下:
当KafkaApis处理完成后会生成RequestChannel.Response对象,并放入RequestChannel中等待Processor处理。此时会更新对应的Request的responseCompleteTimeMs
为当前时间戳:
其中使用到的RequestChannel.Request的updateRequestMetrics()
方法源码如下:
查询GroupCoordinator的请求的ResponseQueueTimeMs度量如下
1.4. BrokerChangeListener中的Timer度量
在Kafka中使用Timer来统计Broker发生增减时,Leader副本选举的时长及其分布,更确切地说,是统计Controller Leader执行整个BrokerChangeListener监听器的handleChildChange(...)
方法所用的时间及其分布。KafkaServer是服务端的入口,它启动时会调用registerStats(...)
方法创建并注册Timer对象:
其中ControllerStats的源码如下:
其中KafkaTimer是对Timer的简单封装,底层还是调用Timer的time()
和stop()
方法实现统计功能。
BrokerChangeListener中对leaderElectionTimer
的使用如下:
在监控界面,LeaderElectionRateAndTimeMs度量显示如下:
UncleanLeaderElectionsPerSec度量显示如下:
2. Kafka的监控
Kafka自身还提供了监控模块的实现,代码位于clients模块的org.apache.kafka.common.metrics包以及其子包中。
2.1. 基础规范类
其中,org.apache.kafka.common.metrics.Measurable接口是度量类型的基础接口,它内部只声明了measure(...)
一个方法,源码如下:
- /**
- * A measurable quantity that can be registered as a metric
- * 度量类型最基础的接口
- */
- public interface Measurable {
- /**
- * Measure this quantity and return the result as a double
- * 用于获取被监控的值
- *
- * @param config The configuration for this metric
- * @param now The POSIX time in milliseconds the measurement is being taken
- * @return The measured value
- */
- public double measure(MetricConfig config, long now);
- }
org.apache.kafka.common.metrics.Stat接口表示需要经过统计计算的度量类型,如平均值、极值等,它内部定义了record(...)
方法用于记录某值并更新度量值:
- /**
- * A Stat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor
- * 表示需要经过统计计算的度量类型,如平均值、最大值、最小值等
- */
- public interface Stat {
- /**
- * Record the given value
- * 通过该方法记录某值并更新度量值
- *
- * @param config The configuration to use for this metric
- * @param value The value to record
- * @param timeMs The POSIX time in milliseconds this value occurred
- */
- public void record(MetricConfig config, double value, long timeMs);
- }
org.apache.kafka.common.metrics.MeasurableStat接口继承了Measurable和Stat,没有声明任何其他的方法,是一个组合接口。
org.apache.kafka.common.metrics.stats.SampledStat是一个抽象类,它表示一个抽样的度量值,除了Total和Rate外的其他MeasurableStat接口实现都依赖它的功能。在SampledStat中可以有多个Sample(抽样,样本)并通过多个Sample完成对一个值的度量,在每个Sample中都记录了其对应的时间窗口和事件数量,SampledStat在计算最终的结果值时,可以根据这两个值决定是否使用此Sample中的数据。SampledStat的定义及重要字段如下:
其中samples
字段是一个集合,默认实现为ArrayList,大小为2,保存了多个Sample对象;Sample是SampledStat的静态内部类,定义及重要字段如下:
另外SampledStat还使用到了MetricConfig类,它是一个配置类,定义和重要字段如下:
SampledStat实现了MeasurableStat接口中的两个方法;其中record(...)
方法会根据时间窗口和事件数使用合适的Sample对象进行记录:
上面record(...)
中调用的update(...)
方法是抽象方法,交由子类实现。
measure(...)
方法首先会将过期的Sample重置,然后调用combine(...)
抽象方法完成计算:
2.2. Total实现类
Total实现了MeasurableStat接口,用于记录总数,源码比较简单,它的record(...)
方法主要负责记录值的累加,measure(...)
方法则直接将记录值返回:
2.3. Count实现类
Count类则继承了SampledStat抽象类,在update(...)
方法中完成了Sample的value
的累加,在combine(...)
方法中将所有未过期的Sample的value
求和并返回。Count与Total的不同之处在于,随着时间窗口的推移,Count中会有Sample过期,所以Count记录的是一段时间内的总数,而Total的值则单调递增:
2.4. Rate实现类
Rate实现了MeasurableStat接口,它用于记录比率,例如每秒钟创建连接的个数。Rate中封装了一个SampledStat类型的对象stat
,默认是Rate的内部类SampledTotal的对象, SampledTotal的实现与Count类似。Rate的record(...)
方法会调用其中的stat
字段的record(...)
方法进行记录;主要源码如下:
2.5. Max实现类
Max实现类继承了SampledStat抽象类,用于记录最大值;它的update(...)
方法会较新的值和旧的值的大小,并记录最大的那个值,combine(...)
方法则用于返回所有Sample中记录的最大值:
2.6. Min实现类
Mix与Max类恰好相反,用于记录最小值,实现方式与Max类似:
2.7. Avg实现类
Avg实现类继承了SampledStat抽象类,用于记录平均值;它的update(...)
方法会对每次传入的Sample的值进行累加,combine(...)
方法则会统计Sample的数量,然后根据update(...)
记录的累加值计算平均值:
2.8. Percentiles实现类
2.9. Kafka的封装
Kafka中通过KafkaMetrics对上述度量类型做了又一次封装,在KafkaMetrics中还封装了MetricName对象和MetricsConfig对象。
我们先来关注MetricName类的定义和重要字段:
在实际应用中,对同一个操作需要有多个不同方面的度量,例如,我们需要监控请求的最大长度,也需要监控请求的平均值等。为了满足这种需求,Kafka将多个相关的度量对象封装进Sensor中。Sensor中的核心字段如下所述:
其中stats
集合用于装载多个Stat度量对象;Sensor的add(...)
方法用于向Sensor中添加Stat:
从源码可知,Sensor的add(...)
方法会将Stat对象包装为KafkaMetric对象,然后创建对应的MBean进行注册,最后将KafkaMetric对象保存在metrics
或stats
集合中。
Sensor的record(...)
方法中完成了记录功能。它会更新lastRecordTime
字段,调用stats
集合中所有Stat对象以及parents
集合中所有父Sensor的record(...)
方法:
Kafka的Metrics类负责统一管理Sensor对象、KafkaMetric对象以及MetricsReporter对象,定义如下:
Metrics的sensor(...)
方法主要负责从sensors
集合中获取Sensor对象,如果指定的Sensor不存在则创建新Sensor对象,并使用childrenSensors
集合记录Sensor的层级关系:
Metrics的addReporter(...)
方法负责添加MetricsReporter:
而在通过addMetric(...)
方法添加KafkaMetric对象时会调用registerMetric(...)
方法向每个MetricReporter进行注册:
在Metrics的构造方法中除了初始化其字段,还会通过metricsScheduler
启动一个ExpireSensorTask定时任务处理“过期Sensor”,创建一个用于记录metrics集合大小的匿名度量类的对象:
ExpireSensorTask任务中会遍历sensors集合,并将过期的Sensor对象删除:
在Metrics的reporters
集合中可以保存多个MetricsReporter对象,当添加KafkaMetrics时会调用MetricReporter的metricChange(...)
方法进行注册,MetricReporter会按照其实现的方式将KafkaMetrics中的度量信息暴露出来,例如JmxReporter实现会创建并注册相应的MBean。MetricsReporter接口中定义了如下方法:
JmxReporter是Kafka提供的MetricsReporter接口默认实现类,它通过JMX的方式将KafkaMetric中的信息暴露出来,它使用Mapmbeans
集合记录了MBean的名称和对应的KafkaMBean对象。KafkaMBean是一个DynamicMBean接口的实现,其中通过Mapmetrics
集合记录了添加的KafkaMetric对象,这些KafkaMetric对象会被当作KafkaMbean对象的属性处理:
JmxReporter的metricChange(...)
方法会将KafkaMetric以属性的形式添加到KafkaMBean中,并重新注册KafkaMBean:
JmxReporter的getMBeanName(...)
方法生成的KafkaMBean名称的格式是prefix:type=[grouop],key1=value1,key2=value2...
:
3. 监控Selector指标
在前面分析KafkaController时知道,在Controller成为Leader时会创建ControllerChannelManager,它负责管理Controller Leader与其他Broker之间的连接。涉及网络连接的问题都会与NetworkClient和Selector相关,本节就以ControllerChannelManager中使用的Selector为例,分析Kafka监控模块的使用。
在ControllerChannelManager的addNewBroker(...)
方法中会给每个新增的Broker创建对应的NetworkClient对象:
在该方法构建Selector对象时,传入了Metrics类型的参数metrics
,该字段在KafkaServer启动时创建:
在Selector中,定义了SelectorMetrics私有内部类,封装了多个Sensor对象:
而在SelectorMetrics的构造方法中会创建上述的Sensor对象,并按照上面描述的功能添加MeasurableStat:
- 在Selector的
close()
方法中会关闭指定的连接,同时会调用connectionClosed
度量对象的record(...)
方法进行记录:
- 在Selector的
poll()
方法中会通过调用Selector的select()
方法监听是否有连接触发其关注的事件,其中会计算select()
方法的起止时间并调用selectTime
字段的record(...)
方法进行记录。select()
方法返回后会调用Selector的pollSelectionKeys()
方法进行I/O操作,处理select()
方法得到的连接,其中会记录I/O操作的起止时间,并调用ioTime
字段的record()方法记录I/O操作的时间:
- 当检测到连接创建成功时会调用
connectionCreated
字段的record(...)
方法进行记录;检测到连接可写时,会向连接写入请求,当写入一个完整的请求后,会将请求放入completedSends
集合等待后续处理,同时调用bytesSent
字段的record(...)
方法进行记录。
- 当从连接中读取到完整的请求后会放入
stagedReceives
集合中暂存,pollSelectionKeys(...)
方法结束后会转移到completedReceives
集合等待后续处理,同时会调用bytesReceived
字段的record(...)
方法进行记录。
注意,bytesSent
和bytesReceived
这两个Sensor是bytesTransferred
的子Sensor,当调用子Sensor的record(...)
方法时会同时调用父Sensor的record(...)
方法。
在MBean监控页面,我们可以看到下面的显示:
对应部门Metadata信息如下:
注:在实际生产中,除了使用JConsole,还有Mx4jLoader、Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor等可视化的工具可供选择。
推荐阅读
Java多线程 46 - ScheduledThreadPoolExecutor详解(2)
ScheduledThreadPoolExecutor用于执行周期性或延时性的定时任务,它是在ThreadPoolExe...