大数据
流式处理
Kafka
源码解析

Kafka系列 26 - 服务端源码分析 17:JMX和Metrics简介

简介:主要讲解JMX和Yammer Metrics的使用

1. JMX的使用

Kafka中使用Yammer Metrics工具包进行内部状态的监控,默认通过JMX方式对外提供监控数据,这部分内容我们先了解一下JMX的简单实用。

JMX(Java Management Extensions,即Java管理扩展)是一个为应用程序、设备、系统等植入管理功能的框架。JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用。JMX在Java编程语言中定义了应用程序以及网络管理和监控的体系结构、设计模式、应用程序接口以及服务。通常使用JMX来监控系统的运行状态或管理系统的某些方面,比如清空缓存、重新加载配置文件等。

JMX体系结构分为以下四个层次:

  • 设备层(Instrumentation Level):主要定义了信息模型。在JMX中,各种管理对象以管理构件的形式存在,需要管理时,向MBean服务器进行注册。该层还定义了通知机制以及一些辅助元数据类。
  • 代理层(Agent Level):主要定义了各种服务以及通信模型。该层的核心是一个MBean服务器,所有的管理构件都需要向它注册,才能被管理。注册在MBean服务器上管理构件并不直接和远程应用程序进行通信,它们通过协议适配器和连接器进行通信。而协议适配器和连接器也以管理构件的形式向MBean服务器注册才能提供相应的服务。
  • 分布服务层(Distributed Service Level):主要定义了能对代理层进行操作的管理接口和构件,这样管理者就可以操作代理。然而,当前的JMX规范并没有给出这一层的具体规范。
  • 附加管理协议API:定义的API主要用来支持当前已经存在的网络管理协议,如SNMP、TMN、CIM/WBEM等。

1.1. MBean的使用

每个MBean必须实现一个接口且接口的名称一般以MBean结尾,例如下面定义了一个PersonMBean接口,声明了nameage两个属性及say()操作:

  • public interface PersonMBean {
  • public String getName();
  • public int getAge();
  • public String say(String hello);
  • }

然后提供一个Person实现类实现该接口:

  • public class Person implements PersonMBean {
  • private String name;
  • private int age;
  • public Person(String name, int age) {
  • this.name = name;
  • this.age = age;
  • }
  • @Override
  • public String getName() {
  • return name;
  • }
  • @Override
  • public int getAge() {
  • return age;
  • }
  • @Override
  • public String say(String hello) {
  • System.out.println("[basic] " + hello);
  • return "[basic] " + this.name + ": " + hello;
  • }
  • }

在PersonAgent类中,将Person注册到MBeanServer中:

  • public class PersonAgent {
  • public static void main(String[] args) throws Exception {
  • // MBeanServer对象
  • MBeanServer server = ManagementFactory.getPlatformMBeanServer();
  • // 构造MBean的名字
  • ObjectName objectName = new ObjectName("jmxBeanTest:name=basic");
  • // 注册MBean,注意第二个位置的名字
  • server.registerMBean(new Person("jack", 27), objectName);
  • // 线程休眠,方便观察
  • Thread.sleep(60 * 60 * 1000);
  • }
  • }

运行PersonAgent的main()方法之后,我们可以使用VisualVM或JConsole工具连接到该程序的JMX,查看MBean标签页:

1.JMX中显示的Attributes.png

其中在右侧窗口的左侧MBeans栏,可以发现“jmxBeanTest”正是我们在定义objectName传入的字符串中冒号前的部分,而其下的子节点“basic”则是定义objectName传入的字符串中name指定的部分。

在右侧窗口的右栏,还有Attributes、Operations、Notifications和Metadata四个标签,Attributes标签显示了Person中两个字段的名称和值;Operations标签中会显示定义的操作,我们在“p1”框内输入字符串“Hello”,点击“say”按钮后会弹出框,提示的内容即为调用Person的say(String hello)方法的返回值:

2.JMX中显示的Operations.png

在Metadata标签页可以看到一些元数据信息:

3.JMX中显示的Metadata.png

1.2. DynamicMBean的使用

在真实开发中,业务Bean类可能并没有实现指定的接口,我们可以使用DynamicMBean处理这种情况。下面是一个PersonDynamic类实现Dynamic接口:

  • public class PersonDynamic implements DynamicMBean {
  • private Person person;
  • private List<MBeanAttributeInfo> attributes = new ArrayList<>();
  • // 描述构造器信息
  • private List<MBeanConstructorInfo> constructors =
  • new ArrayList();
  • // 描述方法信息
  • private List<MBeanOperationInfo> operations = new ArrayList();
  • // 描述通知信息
  • private List<MBeanNotificationInfo> notifications = new ArrayList();
  • // MBeanInfo用于管理以上描述信息
  • private MBeanInfo mBeanInfo;
  • public PersonDynamic(Person person) {
  • this.person = person;
  • try {
  • init();
  • } catch (Exception e) {
  • e.printStackTrace();
  • }
  • }
  • // 初始化方法
  • private void init() throws Exception {
  • // 构建Person的属性、方法、构造器等信息
  • constructors.add(new MBeanConstructorInfo("Person(String, Integer)" +
  • "构造器", this.person.getClass().getConstructors()[0]));
  • attributes.add(new MBeanAttributeInfo("name", "java.lang.String", "姓名",
  • true, false, false));
  • attributes.add(new MBeanAttributeInfo("age", "int", "年龄",
  • true, false, false));
  • operations.add(new MBeanOperationInfo("sayHello()方法", this. person
  • .getClass().getMethod("sayHello", new Class[]{String. class})));
  • // 创建一个MBeanInfo对象
  • this.mBeanInfo = new MBeanInfo(this.getClass().getName(),
  • "Person",
  • attributes.toArray(new MBeanAttributeInfo[attributes. size()]),
  • constructors.toArray(new MBeanConstructorInfo[constructors.size()]),
  • operations.toArray(new MBeanOperationInfo[operations. size()]),
  • notifications.toArray(new MBeanNotificationInfo[notifications.size()])
  • );
  • }
  • @Override
  • public Object getAttribute(String attribute) { // 获取person对象属性值
  • if (attribute.equals("name")) {
  • return this.person.getName();
  • } else if (attribute.equals("age")) {
  • return this.person.getAge();
  • }
  • return null;
  • }
  • @Override
  • public AttributeList getAttributes(String[] attributes) {
  • // 通过属性名获取一个属性对象列表
  • if (attributes == null || attributes.length == 0) {
  • return null;
  • }
  • try {
  • AttributeList attrList = new AttributeList();
  • for (String attrName : attributes) {
  • Object obj = this.getAttribute(attrName);
  • Attribute attribute = new Attribute(attrName, obj);
  • attrList.add(attribute);
  • }
  • return attrList;
  • } catch (Exception e) {
  • e.printStackTrace();
  • }
  • return null;
  • }
  • @Override
  • public MBeanInfo getMBeanInfo() { // 获取MBeanInfo
  • return mBeanInfo;
  • }
  • @Override
  • public Object invoke(String actionName, Object[] params, String[] signature) {
  • // 调用Person里面指定的方法
  • if (actionName.equals("sayHello")) {
  • return this.person.sayHello(params[0].toString());
  • }
  • return null;
  • }
  • @Override
  • public void setAttribute(Attribute attribute) {
  • }
  • @Override
  • public AttributeList setAttributes(AttributeList attributes) {
  • return null;
  • }
  • }

此时我们的Person类可以无需继承PersonMBean接口:

  • public class Person {
  • private String name;
  • private int age;
  • public Person(String name, int age) {
  • this.name = name;
  • this.age = age;
  • }
  • public String getName() {
  • return name;
  • }
  • public int getAge() {
  • return age;
  • }
  • public void setName(String name) {
  • this.name = name;
  • }
  • public void setAge(int age) {
  • this.age = age;
  • }
  • public String sayHello(String hello) {
  • System.out.println("[dynamic] " + hello);
  • return "[dynamic] " + this.name + ": " + hello;
  • }
  • }

DynamicMBean的注册方式有一点差别:

  • public class PersonAgent {
  • public static void main(String[] args) throws Exception {
  • // MBeanServer对象
  • MBeanServer server = ManagementFactory.getPlatformMBeanServer();
  • // 构造MBean的名字
  • ObjectName objectName = new ObjectName("jmxBeanTest:name=dynamic");
  • // 注册MBean,此时会使用PersonDynamic对Person对象进行包装
  • server.registerMBean(new PersonDynamic(new Person("mike", 22)), objectName);
  • // 线程休眠,方便观察
  • Thread.sleep(60 * 60 * 1000);
  • }
  • }

运行该PersonAgent的main()方法之后,VisualVM或JConsole工具的使用与前面的一样,唯一的区别在于Metadata标签页的某些说明会显示我们在PersonDynamic中定义的说明:

4.JMX中DynamicMBean的Metadata.png

2. Yammer Metrics的实现

Metrics类实现了对JMX的支持;当加载Metrics类时会执行其静态代码块,创建MetricsRegistry对象、JmxReporter对象、MBeanServer对象,其中JmxReporter对象以监听器的形式注册到MetricsRegistry上;Metrics的部分源码如下:

com.yammer.metrics.Metrics
  • public class Metrics {
  • // 创建MetricsRegistry对象
  • private static final MetricsRegistry DEFAULT_REGISTRY = new MetricsRegistry();
  • private static final Thread SHUTDOWN_HOOK = new Thread() {
  • public void run() {
  • JmxReporter.shutdownDefault();
  • }
  • };
  • static {
  • // 将JmxReporter对象以监听器的形式注册到MetricsRegistry上
  • JmxReporter.startDefault(DEFAULT_REGISTRY);
  • Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
  • }
  • ...
  • }

JmxReporter继承了MetricsRegistryListener,也即是说,JmxReporter可以作为MetricsRegistry的监听器使用,源码如下:

com.yammer.metrics.reporting.JmxReporter
  • public class JmxReporter extends AbstractReporter implements MetricsRegistryListener,
  • MetricProcessor<JmxReporter.Context> {
  • ...
  • // 管理MBean对象的集合
  • private final Map<MetricName, ObjectName> registeredBeans;
  • private final MBeanServer server;
  • // 单例模式
  • private static JmxReporter INSTANCE;
  • /**
  • * Returns the default instance of {@link JmxReporter} if it has been started.
  • *
  • * @return The default instance or null if the default is not used
  • */
  • public static JmxReporter getDefault() {
  • return INSTANCE;
  • }
  • /**
  • * Starts the default instance of {@link JmxReporter}.
  • *
  • * @param registry the {@link MetricsRegistry} to report from
  • */
  • public static void startDefault(MetricsRegistry registry) {
  • INSTANCE = new JmxReporter(registry);
  • INSTANCE.start();
  • }
  • /**
  • * Starts the reporter.
  • */
  • public final void start() {
  • // 以监听器的形式注册到MetricsRegistry上,当MetricsRegistry中
  • getMetricsRegistry().addListener(this);
  • }
  • /**
  • * Creates a new {@link JmxReporter} for the given registry.
  • *
  • * @param registry a {@link MetricsRegistry}
  • */
  • public JmxReporter(MetricsRegistry registry) {
  • super(registry);
  • this.registeredBeans = new ConcurrentHashMap<MetricName, ObjectName>(100);
  • // 初始化MBeanServer
  • this.server = ManagementFactory.getPlatformMBeanServer();
  • }

以下是MetricsRegistry的addListener(...)方法,可见其将注册的监听器都装进了listeners集合,listeners是线程安全的CopyOnWriteArrayList集合:

MetricsRegistry
  • public void addListener(MetricsRegistryListener listener) {
  • // listeners为CopyOnWriteArrayList类型的集合
  • listeners.add(listener);
  • for (Map.Entry<MetricName, Metric> entry : metrics.entrySet()) {
  • // 通知监听器已有的Metric
  • listener.onMetricAdded(entry.getKey(), entry.getValue());
  • }
  • }

JmxReporter中首先定义了一个AbstractBean静态内部抽象类作为所有度量实现类的父类,其中定义了objectName用于记录度量名称:

com.yammer.metrics.reporting.JmxReporter.AbstractBean
  • private abstract static class AbstractBean implements MetricMBean {
  • private final ObjectName objectName;
  • protected AbstractBean(ObjectName objectName) {
  • this.objectName = objectName;
  • }
  • @Override
  • public ObjectName objectName() {
  • return objectName;
  • }
  • }

然后定义了前面介绍的五种度量类对应的MBean接口,并提供了包装的实现类,分别如下:

针对Gauge度量:

  • GaugeMBean接口:
com.yammer.metrics.reporting.JmxReporter.GaugeMBean
  • // CHECKSTYLE:OFF
  • @SuppressWarnings("UnusedDeclaration")
  • public interface GaugeMBean extends MetricMBean {
  • Object getValue();
  • }
  • // CHECKSTYLE:ON
  • Gauge实现类:
com.yammer.metrics.reporting.JmxReporter.Gauge
  • private static class Gauge extends AbstractBean implements GaugeMBean {
  • private final com.yammer.metrics.core.Gauge<?> metric;
  • private Gauge(com.yammer.metrics.core.Gauge<?> metric, ObjectName objectName) {
  • super(objectName);
  • this.metric = metric;
  • }
  • @Override
  • public Object getValue() {
  • return metric.value();
  • }
  • }

针对Counter度量:

  • CounterMBean接口:
com.yammer.metrics.reporting.JmxReporter.CounterMBean
  • // CHECKSTYLE:OFF
  • @SuppressWarnings("UnusedDeclaration")
  • public interface CounterMBean extends MetricMBean {
  • long getCount();
  • }
  • // CHECKSTYLE:ON
  • Counter实现类:
com.yammer.metrics.reporting.JmxReporter.Counter
  • private static class Counter extends AbstractBean implements CounterMBean {
  • private final com.yammer.metrics.core.Counter metric;
  • private Counter(com.yammer.metrics.core.Counter metric, ObjectName objectName) {
  • super(objectName);
  • this.metric = metric;
  • }
  • @Override
  • public long getCount() {
  • return metric.count();
  • }
  • }

针对Meter度量:

  • MeterMBean接口:
com.yammer.metrics.reporting.JmxReporter.MeterMBean
  • //CHECKSTYLE:OFF
  • @SuppressWarnings("UnusedDeclaration")
  • public interface MeterMBean extends MetricMBean {
  • long getCount();
  • String getEventType();
  • TimeUnit getRateUnit();
  • double getMeanRate();
  • double getOneMinuteRate();
  • double getFiveMinuteRate();
  • double getFifteenMinuteRate();
  • }
  • //CHECKSTYLE:ON
  • Meter实现类:
com.yammer.metrics.reporting.JmxReporter.Meter
  • private static class Meter extends AbstractBean implements MeterMBean {
  • private final Metered metric;
  • private Meter(Metered metric, ObjectName objectName) {
  • super(objectName);
  • this.metric = metric;
  • }
  • @Override
  • public long getCount() {
  • return metric.count();
  • }
  • @Override
  • public String getEventType() {
  • return metric.eventType();
  • }
  • @Override
  • public TimeUnit getRateUnit() {
  • return metric.rateUnit();
  • }
  • @Override
  • public double getMeanRate() {
  • return metric.meanRate();
  • }
  • @Override
  • public double getOneMinuteRate() {
  • return metric.oneMinuteRate();
  • }
  • @Override
  • public double getFiveMinuteRate() {
  • return metric.fiveMinuteRate();
  • }
  • @Override
  • public double getFifteenMinuteRate() {
  • return metric.fifteenMinuteRate();
  • }
  • }

针对Histogram度量:

  • HistogramMBean接口:
com.yammer.metrics.reporting.JmxReporter.HistogramMBean
  • // CHECKSTYLE:OFF
  • @SuppressWarnings("UnusedDeclaration")
  • public interface HistogramMBean extends MetricMBean {
  • long getCount();
  • double getMin();
  • double getMax();
  • double getMean();
  • double getStdDev();
  • double get50thPercentile();
  • double get75thPercentile();
  • double get95thPercentile();
  • double get98thPercentile();
  • double get99thPercentile();
  • double get999thPercentile();
  • double[] values();
  • }
  • // CHECKSTYLE:ON
  • Histogram实现类:
com.yammer.metrics.reporting.JmxReporter.Histogram
  • private static class Histogram implements HistogramMBean {
  • private final ObjectName objectName;
  • private final com.yammer.metrics.core.Histogram metric;
  • private Histogram(com.yammer.metrics.core.Histogram metric, ObjectName objectName) {
  • this.metric = metric;
  • this.objectName = objectName;
  • }
  • @Override
  • public ObjectName objectName() {
  • return objectName;
  • }
  • @Override
  • public double get50thPercentile() {
  • return metric.getSnapshot().getMedian();
  • }
  • @Override
  • public long getCount() {
  • return metric.count();
  • }
  • @Override
  • public double getMin() {
  • return metric.min();
  • }
  • @Override
  • public double getMax() {
  • return metric.max();
  • }
  • @Override
  • public double getMean() {
  • return metric.mean();
  • }
  • @Override
  • public double getStdDev() {
  • return metric.stdDev();
  • }
  • @Override
  • public double get75thPercentile() {
  • return metric.getSnapshot().get75thPercentile();
  • }
  • @Override
  • public double get95thPercentile() {
  • return metric.getSnapshot().get95thPercentile();
  • }
  • @Override
  • public double get98thPercentile() {
  • return metric.getSnapshot().get98thPercentile();
  • }
  • @Override
  • public double get99thPercentile() {
  • return metric.getSnapshot().get99thPercentile();
  • }
  • @Override
  • public double get999thPercentile() {
  • return metric.getSnapshot().get999thPercentile();
  • }
  • @Override
  • public double[] values() {
  • return metric.getSnapshot().getValues();
  • }
  • }

针对Timer度量:

  • TimerMBean接口:

注意:TimerMBean继承了MeterMBean和HistogramMBean,并提供了一个自己的方法声明。

com.yammer.metrics.reporting.JmxReporter.TimerMBean
  • // CHECKSTYLE:OFF
  • @SuppressWarnings("UnusedDeclaration")
  • public interface TimerMBean extends MeterMBean, HistogramMBean {
  • TimeUnit getLatencyUnit();
  • }
  • // CHECKSTYLE:ON
  • Timer实现类:
com.yammer.metrics.reporting.JmxReporter.Timer
  • static class Timer extends Meter implements TimerMBean {
  • private final com.yammer.metrics.core.Timer metric;
  • private Timer(com.yammer.metrics.core.Timer metric, ObjectName objectName) {
  • super(metric, objectName);
  • this.metric = metric;
  • }
  • @Override
  • public double get50thPercentile() {
  • return metric.getSnapshot().getMedian();
  • }
  • @Override
  • public TimeUnit getLatencyUnit() {
  • return metric.durationUnit();
  • }
  • @Override
  • public double getMin() {
  • return metric.min();
  • }
  • @Override
  • public double getMax() {
  • return metric.max();
  • }
  • @Override
  • public double getMean() {
  • return metric.mean();
  • }
  • @Override
  • public double getStdDev() {
  • return metric.stdDev();
  • }
  • @Override
  • public double get75thPercentile() {
  • return metric.getSnapshot().get75thPercentile();
  • }
  • @Override
  • public double get95thPercentile() {
  • return metric.getSnapshot().get95thPercentile();
  • }
  • @Override
  • public double get98thPercentile() {
  • return metric.getSnapshot().get98thPercentile();
  • }
  • @Override
  • public double get99thPercentile() {
  • return metric.getSnapshot().get99thPercentile();
  • }
  • @Override
  • public double get999thPercentile() {
  • return metric.getSnapshot().get999thPercentile();
  • }
  • @Override
  • public double[] values() {
  • return metric.getSnapshot().getValues();
  • }
  • }

注意,上面的这几个实现类都是对com.yammer.metrics.core包下的同名类进行了一层封装。

以Gauge度量器为例,当使用Metrics的newGauge(...)创建Gauge度量器时,会调用MetricsRegistry的newGauge(...)方法:

com.yammer.metrics.Metrics#newGauge
  • public static <T> Gauge<T> newGauge(Class<?> klass,
  • String name,
  • Gauge<T> metric) {
  • return DEFAULT_REGISTRY.newGauge(klass, name, metric);
  • }

MetricsRegistry的newGauge(...)方法的源码如下:

com.yammer.metrics.core.MetricsRegistry#newGauge
  • public <T> Gauge<T> newGauge(Class<?> klass,
  • String name,
  • Gauge<T> metric) {
  • return newGauge(klass, name, null, metric);
  • }

MetricsRegistry的newGauge(...)方法最终会调用到它的getOrAdd(...)方法,源码如下:

com.yammer.metrics.core.MetricsRegistry#getOrAdd
  • protected final <T extends Metric> T getOrAdd(MetricName name, T metric) {
  • // 判断Metrics是否存在,以名称来区分
  • final Metric existingMetric = metrics.get(name);
  • if (existingMetric == null) { // Metric不存在,可以添加
  • // 添加操作
  • final Metric justAddedMetric = metrics.putIfAbsent(name, metric);
  • if (justAddedMetric == null) { // 添加成功
  • // 通知监听器有Metric添加了
  • notifyMetricAdded(name, metric);
  • return metric;
  • }
  • // 添加失败,已经有同名的Metric了
  • if (metric instanceof Stoppable) {
  • // 将新添加的Metric停止
  • ((Stoppable) metric).stop();
  • }
  • // 返回同名的已存在的Metric
  • return (T) justAddedMetric;
  • }
  • // 返回已存在的
  • return (T) existingMetric;
  • }

notifyMetricAdded(...)方法的源码如下:

com.yammer.metrics.core.MetricsRegistry#notifyMetricAdded
  • private void notifyMetricAdded(MetricName name, Metric metric) {
  • // 遍历所有监听器
  • for (MetricsRegistryListener listener : listeners) {
  • // 通知监听器有Metric添加了
  • listener.onMetricAdded(name, metric);
  • }
  • }

而JmxReporter的onMetricAdded(...)方法会根据添加的Metric的类型,触发相应的方法,源码如下:

com.yammer.metrics.reporting.JmxReporter#onMetricAdded
  • @Override
  • public void onMetricAdded(MetricName name, Metric metric) {
  • if (metric != null) {
  • try {
  • metric.processWith(this, name, new Context(name, new ObjectName(name.getMBeanName())));
  • } catch (Exception e) {
  • LOGGER.warn("Error processing {}", name, e);
  • }
  • }
  • }

注意,这里以Gauge度量器为例,自然传入的metric就为Gauge类型,且调用metricprocessWith(...)时传入的第一个参数为JmxReporter自己;我们观察com.yammer.metrics.core.Gauge类的processWith(...)方法:

com.yammer.metrics.core.Gauge
  • @Override
  • public <U> void processWith(MetricProcessor<U> processor, MetricName name, U context) throws Exception {
  • processor.processGauge(name, this, context);
  • }

可见该方法调用了第一个参数processorprocessGauge(...)方法,也即是传入的JmxReporter对象的processGauge(...)方法,该方法源码如下:

com.yammer.metrics.reporting.JmxReporter#processGauge
  • @Override
  • public void processGauge(MetricName name, com.yammer.metrics.core.Gauge<?> gauge, Context context) throws Exception {
  • registerBean(context.getMetricName(), new Gauge(gauge, context.getObjectName()),
  • context.getObjectName());
  • }

最终JmxReporter对象的processGauge(...)方法调用了registerBean(...)方法,其中会将MetricMBean注册到MBeanServer上:

  • private void registerBean(MetricName name, MetricMBean bean, ObjectName objectName)
  • throws MBeanRegistrationException, OperationsException {
  • // 取消之前注册的旧的Metric,以objectName区分
  • if (server.isRegistered(objectName) ){
  • server.unregisterMBean(objectName);
  • }
  • // 注册新的Metric到MBeanServer对象上
  • server.registerMBean(bean, objectName);
  • // 将Metric的名称进行记录
  • registeredBeans.put(name, objectName);
  • }