Presto 设计与实现(六):JMX
- 2023-08-22 北京
本文字数:9046 字
阅读完需:约 30 分钟
1. 原理
JMX:Java Management Extensions 是 Java 的一个开发和通信的标准,用于管理监控应用程序状态:
开发标准:服务端和客户端使用 JMX 提供的辅助类按规定的流程开发;
通信标准:服务端和客户端通信基于 RMI 传输协议,RMI 构建在 TCP 之上。
服务端开发流程:
定义封装系统状态的接口和实现类,可根据系统不同层面定义多个类;
系统初始化时向 MBeanServer 注册这些类的实例;
JMX 会动态创建 RMIServerSocketFactory 接口实例;
RMIServerSocketFactory 接口实例创建 Socket Server,监控客户端的 RMI 请求。
客户端实现流程:
根据服务端的域名和端口拼装 RMI Server 地址;
通过 JMXConnector 连接拼装后的地址;
连接成功后通过 JMXConnector 获取 MBeanServerConnection;
通过 MBeanServerConnection 可以查询服务端注册的对象,获取对象的属性。
2. 服务端实现:不依赖开源项目
步骤 1:定义接口
例如统计接口调用次数。
package com.ice.jmx;
public interface ApiStatsMBean {
// 服务名称
String getName();
// 调用次数
int getCount();
}
步骤 2:实现接口
public class OriginalApiStatsMBean implements ApiStatsMBean {
private String name;
private AtomicInteger count;
public OriginalStatsMBean(String name) {
this.name = name;
this.count = new AtomicInteger();
}
@Override
public String getName() {
return name;
}
@Override
public int getCount() {
return count.get();
}
public void increase() {
count.incrementAndGet();
}
}
步骤 3:注册对象
package com.ice.jmx.server.original;
import com.ice.jmx.ApiStatsMBean;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import java.lang.management.ManagementFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class OriginalServer {
public static void main(String argv[]) throws Exception {
// 1. 获取用于注册的辅助类 MBeanServer
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// 2. 用于统计 api_user_info_v1 接口的调用次数
OriginalApiStatsMBean stats = new OriginalApiStatsMBean("api_user_info_v1");
// 3. 定义对象的访问限定名
ObjectName objectName = new ObjectName("com.ice:name=ApiStatsMBean");
// 4. 使用标准 MBean 进行注册
server.registerMBean(new StandardMBean(stats, ApiStatsMBean.class), objectName);
System.out.println("Registered: " + objectName);
// 下面用于模拟接口被调用的次数
System.out.println("Invoking: " + stats.getName());
while (true) {
stats.increase();
System.out.printf("Invoked %s %s \n", stats.getName(), stats.getCount());
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
}
}
}
步骤 4:开启监听端口
直接运行上面的程序,启动本地 jconsole 直接绑定当前程序可以查看 JMX 信息。若要支持外部访问,你需要在 JVM 参数附加以下信息:
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=9082
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
这里是 JVM 参数:
执行程序:
使用 jconsole 连接:
连接后可查看接口调用信息:
3. 服务端实现:org.weakref.jmxutils
步骤 1:pom.xml 文件中加入依赖
<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
<version>1.21</version>
</dependency>
步骤 2:无需定义接口,1 个普通类即可
在方法使用 @Managed 即可。
package com.ice.jmx.server.weakref;
import org.weakref.jmx.Managed;
public class WeakrefApiStatsBean{
private String name;
private int count;
@Managed
public String getName() {
return name;
}
@Managed
public void setName(String name) {
this.name = name;
}
@Managed
public int getCount() {
return count;
}
@Managed
public void setCount(int count) {
this.count = count;
}
}
步骤 3:定义 Guice Module
package com.ice.jmx.server.weakref;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Singleton;
import org.weakref.jmx.guice.ExportBinder;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
public class WeakrefModule implements Module {
@Override
public void configure(Binder binder) {
binder.bind(MBeanServer.class).toInstance(ManagementFactory.getPlatformMBeanServer());
binder.bind(WeakrefApiStatsBean.class).in(Singleton.class);
// 使用此种方式进行绑定
ExportBinder.newExporter(binder).export(WeakrefApiStatsBean.class).withGeneratedName();
}
}
步骤 4:注册及应用
你同样需要在 JVM 参数中加入绑定端口,参考原生实现的步骤 4。
package com.ice.jmx.server.weakref;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.weakref.jmx.guice.MBeanModule;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class WeakrefServer {
public static void main(String[] args) throws Exception {
// 需要使用 weakref 提供的 MBeanModule
Injector injector = Guice.createInjector(new MBeanModule(), new WeakrefModule());
WeakrefApiStatsBean apiStats = injector.getInstance(WeakrefApiStatsBean.class);
apiStats.setName("api_user_info");
int count = 0;
while (true) {
apiStats.setCount(++count);
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
}
}
}
4. 服务端实现:Airlift-jmx
步骤 1:pom.xml 文件中加入依赖
<dependency>
<groupId>io.airlift</groupId>
<artifactId>jmx</artifactId>
<version>207</version>
</dependency>
步骤 2:配置 jmx_server.properties
jmx.rmiregistry.port=9081
jmx.rmiserver.port=9082
步骤 3:实际应用
package com.ice.jmx.server.airlift;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.ice.jmx.server.weakref.WeakrefModule;
import com.ice.jmx.server.weakref.WeakrefApiStatsBean;
import io.airlift.configuration.ConfigurationFactory;
import io.airlift.configuration.ConfigurationLoader;
import io.airlift.configuration.ConfigurationModule;
import io.airlift.jmx.JmxAgent;
import io.airlift.jmx.JmxModule;
import org.weakref.jmx.guice.MBeanModule;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class AirliftServer {
public static void main(String[] args) throws Exception {
// 1. 需要设定 jmx_server.properties 文件路径
System.setProperty("config", "jmx_server.properties");
System.setProperty("com.sun.management.jmxremote.rmi.port", "9082");
// 2. 使用 ConfigurationFactory 加载默认配置
ConfigurationFactory configurationFactory = new ConfigurationFactory(ConfigurationLoader.loadProperties());
// 3. 构建 module 这里需要加入 JmxModule
MBeanModule mBeanModule = new MBeanModule();
JmxModule jmxModule = new JmxModule();
WeakrefModule weakrefModule = new WeakrefModule();
configurationFactory.registerConfigurationClasses(ImmutableList.of(mBeanModule, jmxModule, weakrefModule));
// 4. 构建 injector
Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), mBeanModule, jmxModule, weakrefModule);
// 5. 需要主动获取下 JmxAgent 实例
injector.getInstance(JmxAgent.class);
WeakrefApiStatsBean apiStats = injector.getInstance(WeakrefApiStatsBean.class);
apiStats.setName("api_user_info");
int count = 0;
while (true) {
apiStats.setCount(++count);
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
}
}
}
总结:在使用 Airlift-jmx 的过程中,如果不清楚底层逻辑,使用起来会有些困难,推荐使用 org.weakref.jmxutils。
5. 服务端实现:Airlift-jmx-http
如果你觉得 RMI 协议比较繁琐, jmx-http 可以让你通过 http 协议访问对象。
步骤 1:pom.xml 文件中加入依赖
<dependency>
<groupId>io.airlift</groupId>
<artifactId>jmx-http</artifactId>
<version>207</version>
</dependency>
步骤 2:定义 NodeModule
package com.ice.jmx.http;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.node.NodeConfig;
import io.airlift.node.NodeInfo;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
public class JmxNodeModule implements Module {
@Override
public void configure(Binder binder) {
binder.bind(NodeInfo.class).in(Scopes.SINGLETON);
NodeConfig nodeConfig = new NodeConfig()
.setEnvironment("prod")
.setNodeInternalAddress("localhost")
.setNodeBindIp("192.168.1.103");
binder.bind(NodeConfig.class).toInstance(nodeConfig);
newExporter(binder).export(NodeInfo.class).withGeneratedName();
}
}
步骤 3: 定义 HttpServer
package com.ice.jmx.http;
import com.google.common.collect.ImmutableSet;
import io.airlift.event.client.NullEventClient;
import io.airlift.http.server.*;
import io.airlift.http.server.HttpServerBinder.HttpResourceBinding;
import io.airlift.node.NodeInfo;
import io.airlift.tracetoken.TraceTokenManager;
import javax.inject.Inject;
import javax.servlet.Filter;
import javax.servlet.Servlet;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class JmxHttpServer extends HttpServer {
private final HttpServerInfo httpServerInfo;
public JmxHttpServer(
HttpServerInfo httpServerInfo,
NodeInfo nodeInfo,
HttpServerConfig config,
@TheServlet Servlet servlet,
@TheServlet Map<String, String> initParameters)
throws IOException {
this(httpServerInfo,
nodeInfo,
config,
servlet,
initParameters,
ImmutableSet.of(),
ImmutableSet.of(),
ClientCertificate.NONE);
}
@Inject
public JmxHttpServer(
HttpServerInfo httpServerInfo,
NodeInfo nodeInfo,
HttpServerConfig config,
@TheServlet Servlet servlet,
@TheServlet Map<String, String> initParameters,
@TheServlet Set<Filter> filters,
@TheServlet Set<HttpResourceBinding> resources,
ClientCertificate clientCertificate)
throws IOException {
super(httpServerInfo,
nodeInfo,
config.setLogEnabled(false),
servlet,
initParameters,
ImmutableSet.copyOf(filters),
ImmutableSet.copyOf(resources),
null,
null,
ImmutableSet.of(),
clientCertificate,
null,
null,
new TraceTokenManager(),
new RequestStats(),
new NullEventClient(),
Optional.empty());
this.httpServerInfo = httpServerInfo;
}
public URI getBaseUrl() {
return httpServerInfo.getHttpUri();
}
public int getPort() {
return httpServerInfo.getHttpUri().getPort();
}
public HttpServerInfo getHttpServerInfo() {
return httpServerInfo;
}
}
步骤 4: 定义 HttpServerModule
package com.ice.jmx.http;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.discovery.client.AnnouncementHttpServerInfo;
import io.airlift.http.server.*;
import io.airlift.http.server.HttpServer.ClientCertificate;
import javax.servlet.Filter;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.http.server.HttpServerBinder.HttpResourceBinding;
public class JmxHttpServerModule implements Module {
private final int httpPort;
public JmxHttpServerModule(int httpPort) {
this.httpPort = httpPort;
}
@Override
public void configure(Binder binder) {
binder.disableCircularProxies();
configBinder(binder).bindConfig(HttpServerConfig.class);
configBinder(binder).bindConfigDefaults(HttpServerConfig.class, config -> {
config.setHttpPort(httpPort);
});
binder.bind(HttpServerInfo.class).in(Scopes.SINGLETON);
binder.bind(JmxHttpServer.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, ClientCertificate.class).setDefault().toInstance(ClientCertificate.NONE);
binder.bind(HttpServer.class).to(Key.get(JmxHttpServer.class));
newSetBinder(binder, Filter.class, TheServlet.class);
newSetBinder(binder, HttpResourceBinding.class, TheServlet.class);
binder.bind(AnnouncementHttpServerInfo.class).to(LocalAnnouncementHttpServerInfo.class);
}
步骤 5: 实际集成
package com.ice.jmx.http;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.jaxrs.JaxrsModule;
import io.airlift.jmx.JmxHttpModule;
import io.airlift.json.JsonModule;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
public class JmxHttpServerStart {
public static void main(String[] args) throws Exception {
Bootstrap app = new Bootstrap(
new JmxNodeModule(),
new JmxHttpServerModule(8080),
new JsonModule(),
new JaxrsModule(),
new JmxHttpModule(),
binder -> binder.bind(MBeanServer.class).toInstance(ManagementFactory.getPlatformMBeanServer()));
app.quiet().strictConfig().initialize();
System.in.read();
}
}
执行程序:
通过 http://192.168.1.103:8080/v1/jmx 访问
接口说明:
GET /v1/jmx:以 html 方式显示;
GET /v1/jmx/mbean:获取所有对象;
GET /v1/jmx/mbean/{objectName}:获取指定对象;
GET /v1/jmx/mbean/{objectName}/{attributeName}:获取指定对象属性。
6. 客户端实现:Airlift-jmx-client
步骤 1:配置 jmx.properties
jmx.rmiregistry.port=9081
步骤 2:实际集成
package com.ice.jmx.client;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.airlift.configuration.ConfigurationFactory;
import io.airlift.configuration.ConfigurationLoader;
import io.airlift.configuration.ConfigurationModule;
import io.airlift.jmx.JmxAgent;
import io.airlift.jmx.JmxModule;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class AirliftClient {
public static void main(String[] args) throws Exception {
// 1. 加载 jmx.properties
System.setProperty("config", "jmx.properties");
System.setProperty("com.sun.management.jmxremote.port", "9081");
ConfigurationFactory configurationFactory = new ConfigurationFactory(ConfigurationLoader.loadProperties());
// 2. 这里需要创建 JmxModule 和 ConfigurationModule
JmxModule module = new JmxModule();
configurationFactory.registerConfigurationClasses(ImmutableList.of(module));
// 3. 构建 Injector
Injector injector = Guice.createInjector(new ConfigurationModule(configurationFactory), module);
// 4. 可以通过 jmxAgent 获取 RMI Server 地址
JmxAgent jmxAgent = injector.getInstance(JmxAgent.class);
// 5. 通过 JMXConnectorFactory 连接
JMXConnector connector = JMXConnectorFactory.connect(jmxAgent.getUrl());
connector.connect();
// 6. 通过 JMXConnectorFactory 返回的 MBeanServerConnection 获取对象
MBeanServerConnection connection = connector.getMBeanServerConnection();
Set<ObjectInstance> instances = connection.queryMBeans(null, null);
for (ObjectInstance instance : instances) {
ObjectName name = instance.getObjectName();
if (!name.getDomain().startsWith("com.ice")) {
continue;
}
System.out.println("Printing: " + name);
print('v', 32);
System.out.println();
for (int i = 0; i < 10; i++) {
print(connection, instance.getObjectName());
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
}
System.out.println();
print('^', 32);
System.out.println("Printed: " + name);
}
}
private static void print(MBeanServerConnection connection, ObjectName objectName) throws Exception {
MBeanInfo bean = connection.getMBeanInfo(objectName);
MBeanAttributeInfo[] attributes = bean.getAttributes();
String[] attributeNames = new String[attributes.length];
for (int i = 0; i < attributes.length; i++) {
attributeNames[i] = attributes[i].getName();
}
AttributeList attributeList = connection.getAttributes(objectName, attributeNames);
attributeList.asList().forEach(r -> System.out.printf("name=%s, value=%s \n", r.getName(), r.getValue()));
}
private static void print(char c, int size) {
char[] chars = new char[size];
Arrays.fill(chars, c);
System.out.println(new String(chars));
}
}
实际执行时通过 Wireshark 监控 RMI 协议:
1 ~ 3 行 TCP 三次握手,4 行 TCP 窗口大小,5 行客户端发起的 RMI 请求,RMI 协议 Header:
魔数: JRMI,4 个字节,0x4a 0x52 0x4d 0x49;
版本号:2 个字节,0x00 0x02 表示版本号 2;
协议:1 个字节,0x4b=StreamProtocol,0x4c=SingleOpProtocol,0x4d=MultiplexProtocol。
更详细的可参考:https://docs.oracle.com/javase/8/docs/platform/rmi/spec/rmi-protocol3.html
最后 4 行是由客户端发起的 TCP 四次挥手。
7. Presto 中的应用
在 Presto 的每个节点中都使用了 Airlift 的 jmx 和 jmx-http,你可以通过 RMI 或者 http 接口实时获取节点的监控信息:
查询相关的统计:查询各种状态的统计,例如成功、失败、取消和队列中任务数,以及查询执行时间和使用的各种资源;
集群内存资源统计;
SQL 全生命周期监控:词法解析、语法解析、生成语法树、动态代码生成、逻辑计划生成和任务创建分配等;
缓存监控:缓存使用容量、缓存命中、缓存未命中和缓存配额增加等;
Presto 部分 C++ 代码会有本地方法调用的统计。
版权声明: 本文为 InfoQ 作者【冰心的小屋】的原创文章。
原文链接:【http://xie.infoq.cn/article/fe505a3d4bf761c8eb6f56ce5】。文章转载请联系作者。
冰心的小屋
分享技术上的点滴收获! 2013-08-06 加入
一杯咖啡,一首老歌,一段代码,欢迎做客冰屋,享受编码和技术带来的快乐!
评论