写点什么

Presto 设计与实现(六):JMX

作者:冰心的小屋
  • 2023-08-22
    北京
  • 本文字数:9046 字

    阅读完需:约 30 分钟

Presto 设计与实现(六):JMX

1. 原理

JMX:Java Management Extensions 是 Java 的一个开发和通信的标准,用于管理监控应用程序状态:

  • 开发标准:服务端和客户端使用 JMX 提供的辅助类按规定的流程开发;

  • 通信标准:服务端和客户端通信基于 RMI 传输协议,RMI 构建在 TCP 之上。


服务端开发流程:

  1. 定义封装系统状态的接口和实现类,可根据系统不同层面定义多个类;

  2. 系统初始化时向 MBeanServer 注册这些类的实例;

  3. JMX 会动态创建 RMIServerSocketFactory 接口实例;

  4. RMIServerSocketFactory 接口实例创建 Socket Server,监控客户端的 RMI 请求。


客户端实现流程:

  1. 根据服务端的域名和端口拼装 RMI Server 地址;

  2. 通过 JMXConnector 连接拼装后的地址;

  3. 连接成功后通过 JMXConnector 获取 MBeanServerConnection;

  4. 通过 MBeanServerConnection 可以查询服务端注册的对象,获取对象的属性。

2. 服务端实现:不依赖开源项目


步骤 1:定义接口

例如统计接口调用次数。

package com.ice.jmx;
public interface ApiStatsMBean { // 服务名称 String getName();
// 调用次数 int getCount();}
复制代码


步骤 2:实现接口

public class OriginalApiStatsMBean implements ApiStatsMBean {    private String name;    private AtomicInteger count;
public OriginalStatsMBean(String name) { this.name = name; this.count = new AtomicInteger(); }
@Override public String getName() { return name; }
@Override public int getCount() { return count.get(); }
public void increase() { count.incrementAndGet(); }}
复制代码


步骤 3:注册对象

package com.ice.jmx.server.original;
import com.ice.jmx.ApiStatsMBean;
import javax.management.MBeanServer;import javax.management.ObjectName;import javax.management.StandardMBean;import java.lang.management.ManagementFactory;import java.util.Random;import java.util.concurrent.TimeUnit;
public class OriginalServer {
public static void main(String argv[]) throws Exception { // 1. 获取用于注册的辅助类 MBeanServer MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// 2. 用于统计 api_user_info_v1 接口的调用次数 OriginalApiStatsMBean stats = new OriginalApiStatsMBean("api_user_info_v1");
// 3. 定义对象的访问限定名 ObjectName objectName = new ObjectName("com.ice:name=ApiStatsMBean");
// 4. 使用标准 MBean 进行注册 server.registerMBean(new StandardMBean(stats, ApiStatsMBean.class), objectName); System.out.println("Registered: " + objectName);
// 下面用于模拟接口被调用的次数 System.out.println("Invoking: " + stats.getName()); while (true) { stats.increase(); System.out.printf("Invoked %s %s \n", stats.getName(), stats.getCount()); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)); } }}
复制代码


步骤 4:开启监听端口

直接运行上面的程序,启动本地 jconsole 直接绑定当前程序可以查看 JMX 信息。若要支持外部访问,你需要在 JVM 参数附加以下信息:

-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxremote.port=9082-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false
复制代码


这里是 JVM 参数:


执行程序:


使用 jconsole 连接:


连接后可查看接口调用信息:

3. 服务端实现:org.weakref.jmxutils


步骤 1:pom.xml 文件中加入依赖

<dependency>    <groupId>org.weakref</groupId>    <artifactId>jmxutils</artifactId>    <version>1.21</version></dependency>
复制代码


步骤 2:无需定义接口,1 个普通类即可

在方法使用 @Managed 即可。

package com.ice.jmx.server.weakref;
import org.weakref.jmx.Managed;
public class WeakrefApiStatsBean{ private String name; private int count;
@Managed public String getName() { return name; }
@Managed public void setName(String name) { this.name = name; }
@Managed public int getCount() { return count; }
@Managed public void setCount(int count) { this.count = count; }}
复制代码


步骤 3:定义 Guice Module

package com.ice.jmx.server.weakref;
import com.google.inject.Binder;import com.google.inject.Module;import com.google.inject.Singleton;import org.weakref.jmx.guice.ExportBinder;
import javax.management.MBeanServer;import java.lang.management.ManagementFactory;
public class WeakrefModule implements Module { @Override public void configure(Binder binder) { binder.bind(MBeanServer.class).toInstance(ManagementFactory.getPlatformMBeanServer()); binder.bind(WeakrefApiStatsBean.class).in(Singleton.class); // 使用此种方式进行绑定 ExportBinder.newExporter(binder).export(WeakrefApiStatsBean.class).withGeneratedName(); }}
复制代码


步骤 4:注册及应用

你同样需要在 JVM 参数中加入绑定端口,参考原生实现的步骤 4。

package com.ice.jmx.server.weakref;
import com.google.inject.Guice;import com.google.inject.Injector;import org.weakref.jmx.guice.MBeanModule;
import java.util.Random;import java.util.concurrent.TimeUnit;
public class WeakrefServer { public static void main(String[] args) throws Exception { // 需要使用 weakref 提供的 MBeanModule Injector injector = Guice.createInjector(new MBeanModule(), new WeakrefModule());
WeakrefApiStatsBean apiStats = injector.getInstance(WeakrefApiStatsBean.class); apiStats.setName("api_user_info");
int count = 0; while (true) { apiStats.setCount(++count); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } }}
复制代码

4. 服务端实现:Airlift-jmx

步骤 1:pom.xml 文件中加入依赖

<dependency>    <groupId>io.airlift</groupId>    <artifactId>jmx</artifactId>    <version>207</version></dependency>
复制代码


步骤 2:配置 jmx_server.properties

jmx.rmiregistry.port=9081jmx.rmiserver.port=9082
复制代码


步骤 3:实际应用

package com.ice.jmx.server.airlift;
import com.google.common.collect.ImmutableList;import com.google.inject.Guice;import com.google.inject.Injector;import com.ice.jmx.server.weakref.WeakrefModule;import com.ice.jmx.server.weakref.WeakrefApiStatsBean;import io.airlift.configuration.ConfigurationFactory;import io.airlift.configuration.ConfigurationLoader;import io.airlift.configuration.ConfigurationModule;import io.airlift.jmx.JmxAgent;import io.airlift.jmx.JmxModule;import org.weakref.jmx.guice.MBeanModule;
import java.util.Random;import java.util.concurrent.TimeUnit;
public class AirliftServer { public static void main(String[] args) throws Exception { // 1. 需要设定 jmx_server.properties 文件路径 System.setProperty("config", "jmx_server.properties"); System.setProperty("com.sun.management.jmxremote.rmi.port", "9082"); // 2. 使用 ConfigurationFactory 加载默认配置 ConfigurationFactory configurationFactory = new ConfigurationFactory(ConfigurationLoader.loadProperties());
// 3. 构建 module 这里需要加入 JmxModule MBeanModule mBeanModule = new MBeanModule(); JmxModule jmxModule = new JmxModule(); WeakrefModule weakrefModule = new WeakrefModule(); configurationFactory.registerConfigurationClasses(ImmutableList.of(mBeanModule, jmxModule, weakrefModule));
// 4. 构建 injector Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), mBeanModule, jmxModule, weakrefModule); // 5. 需要主动获取下 JmxAgent 实例 injector.getInstance(JmxAgent.class);
WeakrefApiStatsBean apiStats = injector.getInstance(WeakrefApiStatsBean.class); apiStats.setName("api_user_info");
int count = 0; while (true) { apiStats.setCount(++count); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } }}
复制代码


总结:在使用 Airlift-jmx 的过程中,如果不清楚底层逻辑,使用起来会有些困难,推荐使用 org.weakref.jmxutils。

5. 服务端实现:Airlift-jmx-http

如果你觉得 RMI 协议比较繁琐, jmx-http 可以让你通过 http 协议访问对象。


步骤 1:pom.xml 文件中加入依赖

<dependency>    <groupId>io.airlift</groupId>    <artifactId>jmx-http</artifactId>    <version>207</version></dependency>
复制代码


步骤 2:定义 NodeModule

package com.ice.jmx.http;
import com.google.inject.Binder;import com.google.inject.Module;import com.google.inject.Scopes;import io.airlift.node.NodeConfig;import io.airlift.node.NodeInfo;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
public class JmxNodeModule implements Module {
@Override public void configure(Binder binder) { binder.bind(NodeInfo.class).in(Scopes.SINGLETON); NodeConfig nodeConfig = new NodeConfig() .setEnvironment("prod") .setNodeInternalAddress("localhost") .setNodeBindIp("192.168.1.103"); binder.bind(NodeConfig.class).toInstance(nodeConfig);
newExporter(binder).export(NodeInfo.class).withGeneratedName(); }}
复制代码


步骤 3: 定义 HttpServer

package com.ice.jmx.http;
import com.google.common.collect.ImmutableSet;import io.airlift.event.client.NullEventClient;import io.airlift.http.server.*;import io.airlift.http.server.HttpServerBinder.HttpResourceBinding;import io.airlift.node.NodeInfo;import io.airlift.tracetoken.TraceTokenManager;
import javax.inject.Inject;import javax.servlet.Filter;import javax.servlet.Servlet;import java.io.IOException;import java.net.URI;import java.util.Map;import java.util.Optional;import java.util.Set;
public class JmxHttpServer extends HttpServer { private final HttpServerInfo httpServerInfo;
public JmxHttpServer( HttpServerInfo httpServerInfo, NodeInfo nodeInfo, HttpServerConfig config, @TheServlet Servlet servlet, @TheServlet Map<String, String> initParameters) throws IOException { this(httpServerInfo, nodeInfo, config, servlet, initParameters, ImmutableSet.of(), ImmutableSet.of(), ClientCertificate.NONE); }
@Inject public JmxHttpServer( HttpServerInfo httpServerInfo, NodeInfo nodeInfo, HttpServerConfig config, @TheServlet Servlet servlet, @TheServlet Map<String, String> initParameters, @TheServlet Set<Filter> filters, @TheServlet Set<HttpResourceBinding> resources, ClientCertificate clientCertificate) throws IOException { super(httpServerInfo, nodeInfo, config.setLogEnabled(false), servlet, initParameters, ImmutableSet.copyOf(filters), ImmutableSet.copyOf(resources), null, null, ImmutableSet.of(), clientCertificate, null, null, new TraceTokenManager(), new RequestStats(), new NullEventClient(), Optional.empty()); this.httpServerInfo = httpServerInfo; }
public URI getBaseUrl() { return httpServerInfo.getHttpUri(); }
public int getPort() { return httpServerInfo.getHttpUri().getPort(); }
public HttpServerInfo getHttpServerInfo() { return httpServerInfo; }}
复制代码


步骤 4: 定义 HttpServerModule

package com.ice.jmx.http;
import com.google.inject.Binder;import com.google.inject.Key;import com.google.inject.Module;import com.google.inject.Scopes;import io.airlift.discovery.client.AnnouncementHttpServerInfo;import io.airlift.http.server.*;import io.airlift.http.server.HttpServer.ClientCertificate;
import javax.servlet.Filter;
import static com.google.inject.multibindings.Multibinder.newSetBinder;import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;import static io.airlift.configuration.ConfigBinder.configBinder;import static io.airlift.http.server.HttpServerBinder.HttpResourceBinding;
public class JmxHttpServerModule implements Module { private final int httpPort;
public JmxHttpServerModule(int httpPort) { this.httpPort = httpPort; }
@Override public void configure(Binder binder) { binder.disableCircularProxies();
configBinder(binder).bindConfig(HttpServerConfig.class); configBinder(binder).bindConfigDefaults(HttpServerConfig.class, config -> { config.setHttpPort(httpPort); });
binder.bind(HttpServerInfo.class).in(Scopes.SINGLETON); binder.bind(JmxHttpServer.class).in(Scopes.SINGLETON); newOptionalBinder(binder, ClientCertificate.class).setDefault().toInstance(ClientCertificate.NONE); binder.bind(HttpServer.class).to(Key.get(JmxHttpServer.class)); newSetBinder(binder, Filter.class, TheServlet.class); newSetBinder(binder, HttpResourceBinding.class, TheServlet.class); binder.bind(AnnouncementHttpServerInfo.class).to(LocalAnnouncementHttpServerInfo.class); }
复制代码


步骤 5: 实际集成

package com.ice.jmx.http;
import io.airlift.bootstrap.Bootstrap;import io.airlift.jaxrs.JaxrsModule;import io.airlift.jmx.JmxHttpModule;import io.airlift.json.JsonModule;
import javax.management.MBeanServer;import java.lang.management.ManagementFactory;
public class JmxHttpServerStart { public static void main(String[] args) throws Exception { Bootstrap app = new Bootstrap( new JmxNodeModule(), new JmxHttpServerModule(8080), new JsonModule(), new JaxrsModule(), new JmxHttpModule(), binder -> binder.bind(MBeanServer.class).toInstance(ManagementFactory.getPlatformMBeanServer()));
app.quiet().strictConfig().initialize(); System.in.read(); }}
复制代码


执行程序:


通过 http://192.168.1.103:8080/v1/jmx 访问


接口说明:

  • GET /v1/jmx:以 html 方式显示;

  • GET /v1/jmx/mbean:获取所有对象;

  • GET /v1/jmx/mbean/{objectName}:获取指定对象;

  • GET /v1/jmx/mbean/{objectName}/{attributeName}:获取指定对象属性。

6. 客户端实现:Airlift-jmx-client

步骤 1:配置 jmx.properties

jmx.rmiregistry.port=9081
复制代码


步骤 2:实际集成

package com.ice.jmx.client;
import com.google.common.collect.ImmutableList;import com.google.inject.Guice;import com.google.inject.Injector;import io.airlift.configuration.ConfigurationFactory;import io.airlift.configuration.ConfigurationLoader;import io.airlift.configuration.ConfigurationModule;import io.airlift.jmx.JmxAgent;import io.airlift.jmx.JmxModule;
import javax.management.*;import javax.management.remote.JMXConnector;import javax.management.remote.JMXConnectorFactory;import java.util.Arrays;import java.util.Random;import java.util.Set;import java.util.concurrent.TimeUnit;
public class AirliftClient { public static void main(String[] args) throws Exception { // 1. 加载 jmx.properties System.setProperty("config", "jmx.properties"); System.setProperty("com.sun.management.jmxremote.port", "9081"); ConfigurationFactory configurationFactory = new ConfigurationFactory(ConfigurationLoader.loadProperties());
// 2. 这里需要创建 JmxModule 和 ConfigurationModule JmxModule module = new JmxModule(); configurationFactory.registerConfigurationClasses(ImmutableList.of(module));
// 3. 构建 Injector Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), module); // 4. 可以通过 jmxAgent 获取 RMI Server 地址 JmxAgent jmxAgent = injector.getInstance(JmxAgent.class);
// 5. 通过 JMXConnectorFactory 连接 JMXConnector connector = JMXConnectorFactory.connect(jmxAgent.getUrl()); connector.connect();
// 6. 通过 JMXConnectorFactory 返回的 MBeanServerConnection 获取对象 MBeanServerConnection connection = connector.getMBeanServerConnection(); Set<ObjectInstance> instances = connection.queryMBeans(null, null); for (ObjectInstance instance : instances) { ObjectName name = instance.getObjectName(); if (!name.getDomain().startsWith("com.ice")) { continue; }
System.out.println("Printing: " + name); print('v', 32); System.out.println();
for (int i = 0; i < 10; i++) { print(connection, instance.getObjectName()); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); }
System.out.println(); print('^', 32); System.out.println("Printed: " + name); } }
private static void print(MBeanServerConnection connection, ObjectName objectName) throws Exception { MBeanInfo bean = connection.getMBeanInfo(objectName); MBeanAttributeInfo[] attributes = bean.getAttributes(); String[] attributeNames = new String[attributes.length]; for (int i = 0; i < attributes.length; i++) { attributeNames[i] = attributes[i].getName(); }
AttributeList attributeList = connection.getAttributes(objectName, attributeNames); attributeList.asList().forEach(r -> System.out.printf("name=%s, value=%s \n", r.getName(), r.getValue())); }
private static void print(char c, int size) { char[] chars = new char[size]; Arrays.fill(chars, c); System.out.println(new String(chars)); }}
复制代码


实际执行时通过 Wireshark 监控 RMI 协议:

1 ~ 3 行 TCP 三次握手,4 行 TCP 窗口大小,5 行客户端发起的 RMI 请求,RMI 协议 Header:

  • 魔数: JRMI,4 个字节,0x4a 0x52 0x4d 0x49;

  • 版本号:2 个字节,0x00 0x02 表示版本号 2;

  • 协议:1 个字节,0x4b=StreamProtocol,0x4c=SingleOpProtocol,0x4d=MultiplexProtocol。

更详细的可参考:https://docs.oracle.com/javase/8/docs/platform/rmi/spec/rmi-protocol3.html

最后 4 行是由客户端发起的 TCP 四次挥手。

7. Presto 中的应用

在 Presto 的每个节点中都使用了 Airlift 的 jmx 和 jmx-http,你可以通过 RMI 或者 http 接口实时获取节点的监控信息:

  • 查询相关的统计:查询各种状态的统计,例如成功、失败、取消和队列中任务数,以及查询执行时间和使用的各种资源;

  • 集群内存资源统计;

  • SQL 全生命周期监控:词法解析、语法解析、生成语法树、动态代码生成、逻辑计划生成和任务创建分配等;

  • 缓存监控:缓存使用容量、缓存命中、缓存未命中和缓存配额增加等;

  • Presto 部分 C++ 代码会有本地方法调用的统计。

发布于: 刚刚阅读数: 4
用户头像

分享技术上的点滴收获! 2013-08-06 加入

一杯咖啡,一首老歌,一段代码,欢迎做客冰屋,享受编码和技术带来的快乐!

评论

发布
暂无评论
Presto 设计与实现(六):JMX_数据湖_冰心的小屋_InfoQ写作社区