目前底层越来越会去依赖 Netty 去做高性能开发,直接使用 Netty 会感觉 API 不好用,且有些功能不满足,所以这里可以去使用 Vert.x 去做一些二次开发。
什么是 Vert.x
Vert.x 是由 eclipse 公司基于 netty 开发的一套工具集,请注意,它是一个工具集,并不是一个 framework,它也可以当作是一个 Netty 的超集,它二次封装了 Netty 的功能,提供更简单的 API 来使用,也提供了 netty 不具备的功能,例如跟 kafka,redis,zk 等的集成。
由于 Vert.x 是基于 Netty 来实现的,所以都是采用的异步 IO 的方式来实现,用来更好的利用服务器的资源。
Vert.x 是一个工具集,它可以让你简单的创建一个工具,例如 http proxy,event bus。
Vert.x 也提供了集群方式,支持用 zk 来管理集群。
Vert.x 也提供了基于 reactive 模式的数据库,Kafka,Redis 等组件,你可以放在任何项目中使用,只需要按需引入即可。
总之,当你使用了 Vert.x,你会发现一切是那么的简单。
!官方链接
如何使用 Vert.x
你可以从https://start.vertx.io/ 这里开始创建你的第一个 Vert.x 的应用。
在开发环境下,你可以使用./mvnw clean compile exec:java
来启动项目。
Vert.x 的核心是Verticle
,Verticle 包含两种 type
Standard Verticles: 最常用的一种 type,它是在 event loop 里面执行的,所以不能有任何 block 代码,对应的就是 netty 的 event loop。Vert.x 默认创建的就是 Standard Verticles。
Worker Verticles: 它会从 worker pool 里面获取线程执行,它不会有线程上下文切换,对应的就是 netty 的 worker group。你可以在创建 Verticles 的时候设置它为 worker,这样创建出来的就是 Worker Verticles,它主要用来执行一些会 block 的操作。
DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle("com.kevin.DemoVerticle", options);
复制代码
在多核的服务器上,你可以设置多个 Instance,instance 之间的通信都是通过 mulitcast 的通信方式。
DeploymentOptions options = new DeploymentOptions().setInstances(16);
vertx.deployVerticle("com.kevin.DemoVerticle", options);
复制代码
如何部署 Vert.x
你也可以通过./mvnw clean package
创建出一个可执行的 jar 包,同时你可以可以创建一个 Dockerfile,将它打包成一个 Docker image。
同样你也可以通过 vertx
命令来部署 Verticles。
Standlone && Cluster
如上所述,可以在 standlone 的代码里面设置 instance 个数,来利用服务器的多核。也可以利用 zookeeper 来设置一个 cluster 的集群。
ClusterManager mgr = new ZookeeperClusterManager();
VertxOptions options = new VertxOptions().setClusterManager(mgr);
复制代码
当然它还支持更多的 cluster 集群。
Cloud
Vert.x 同样提供了基于云的一些基础模块,同样也可以通过返回的 json 定制化监控模块,内置的如下:
Monitoring
Zikin
Open Tracing
Metrics
Micrometer
RPC
Vert.x 目前只支持 gRPC。
常见应用
Vert.x-Web
用来搭建一个 web 应用,也支持常见的一些模板技术,像 freemaker,也支持动态路由。
public void start(ClusterManager mgr, Vertx vertx, Promise<Void> startPromise) {
int port = 8887;
Router router = Router.router(vertx);
routeGroup(mgr, router);
healthcheckGroup(vertx, router);
HttpServer server = vertx.createHttpServer();
server.requestHandler(router);
server.listen(port, http -> {
if (http.succeeded()) {
startPromise.complete();
System.out.println("HTTP server started on port " + port);
} else {
startPromise.fail(http.cause());
}
});
send(mgr, vertx, port);
}
public void routeGroup(ClusterManager mgr, Router router) {
String nodeId = mgr.getNodeId();
router.route(HttpMethod.GET, "/demo").handler(request -> {
request.end("hello world from " + nodeId);
});
}
复制代码
Tcp server
简单的创建一个 TCP 服务
public void start(Promise<Void> startPromise) throws Exception {
NetServerOptions serverOptions = new NetServerOptions().setLogActivity(true)
.setActivityLogDataFormat(ByteBufFormat.SIMPLE);
NetServer server = vertx.createNetServer(serverOptions);
server.connectHandler(socket -> {
socket.handler(buffer -> {
String s = new String(buffer.getBytes());
System.out.println(s);
Buffer buf = Buffer.buffer("hello " + s);
socket.write(buf);
});
});
server.listen(1235, "127.0.0.1").onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("connected");
} else {
System.out.println(ar.cause().getMessage());
}
});
复制代码
EventBus & SharedData
这两个放在一起是因为可以都依赖于 cluster,这里还是以 zookeeper 为例子。
EventBus
如果是同一个 Verticles,会通过 tcp 的 broadcast 来传递消息,如果是集群环境,可以通过 zookeeper 来传递消息。
结合 httpserver 的一个例子:发送
Vertx vertx = res.result();
EventBus eb = vertx.eventBus();
vertx.createHttpServer().requestHandler(req -> {
String q = req.params().get("q");
eb.request("q", q).onComplete(ar -> {
if (ar.succeeded()) {
req.response()
.putHeader("content-type", "text/plain")
.end(ar.result().body().toString());
} else {
System.out.println(ar.cause().getMessage());
}
});
复制代码
接收
Vertx vertx = res.result();
mgr.getNodeId();
EventBus eb = vertx.eventBus();
MessageConsumer<Object> consumer = eb.consumer("q");
consumer.handler(msg -> {
msg.reply(msg.body() + "-" + mgr.getNodeId());
});
复制代码
SharedData
用法跟 eventbus 类似,也提供了 local sharedata。这里演示在用 zookeeper 的 cluster 环境下代码 Set value
SharedData sd = vertx.sharedData();
sd.<String, String>getAsyncMap("discovery").onComplete(res -> {
if (res.succeeded()) {
AsyncMap<String, String> map = res.result();
map.put("address", "com.kevin.cloud").onComplete(v -> {
});
}
});
复制代码
Get value
SharedData sd = vertx.sharedData();
sd.<String, String>getAsyncMap("discovery", ar -> {
if (ar.succeeded()) {
AsyncMap<String, String> map = ar.result();
map.get("address", adr -> {
if (adr.succeeded()) {
cb.handle(adr.result());
}
});
}
});
复制代码
Service Discovery
可以借助 ZK 来搭建一个服务注册发现的服务。
服务端代码
ServiceDiscovery discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions()
.setAnnounceAddress(address)
.setName(serviceName));
Record record = HttpEndpoint.createRecord(serviceName, host, port, "/" + serviceName);
discovery.publish(record, ar -> {
if (ar.succeeded() && ar.result() != null) {
req.response()
.putHeader("content-type", "application/json")
.end(ar.result().toString());
SharedData sharedData = vertx.sharedData();
sharedData.<String, List<String>>getLocalAsyncMap("services").onComplete(svcs -> {
if (svcs.succeeded()) {
svcs.result().get("svcs", arSvcList -> {
if (arSvcList.succeeded()) {
arSvcList.result().add(serviceName);
}
});
}
});
}
discovery.close();
});
复制代码
客户端代码
ServiceDiscovery discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions()
.setAnnounceAddress(address)
.setName("demo"));
discovery.getRecord(r -> r.getName().equalsIgnoreCase("demo")).onComplete(ar -> {
if (ar.succeeded()) {
if (ar.result() == null) {
System.out.println("record is null.");
return;
}
io.vertx.servicediscovery.Record record = ar.result();
ServiceReference reference = discovery.getReference(record);
HttpClient client = reference.getAs(HttpClient.class);
System.out.println(ar.result());
client.request(HttpMethod.GET, "/demo").compose(request -> request.send()
.compose(HttpClientResponse::body).onComplete(ar2 -> {
if (ar2.succeeded()) {
System.out.println(ar2.result());
}
ServiceDiscovery.releaseServiceObject(discovery, client);
}).onFailure(f -> {
f.printStackTrace();
}));
}
});
复制代码
代理
利用 Vert.x,你可以快速的创建一个代理。
public void start(Promise<Void> startPromise) throws Exception {
HttpServer proxyServer = vertx.createHttpServer();
Router proxyRouter = Router.router(vertx);
proxyServer.requestHandler(proxyRouter);
proxyServer.listen(9999, http -> {
System.out.println(http.succeeded());
});
HttpClient client = vertx.createHttpClient();
HttpProxy httpProxy = HttpProxy.reverseProxy(client);
httpProxy.origin(8888, "127.0.0.1");
proxyRouter
.route(HttpMethod.GET, "/*").handler(ProxyHandler.create(httpProxy));
}
复制代码
Conclustion
使用 Vert.x 你可以快速的构建出你自己的模块或者应用,可以获得更好的性能体验,Quarkus 就是基于 Vert.x 来搭建起来的一个框架。
完整的 Vert.x 的用法,请参考官方文档
评论